Merge pull request #267 from AppFlowy-IO/rust_stable_channel

switch to rust stable channel
This commit is contained in:
AppFlowy.IO 2022-01-22 20:47:13 -08:00 committed by GitHub
commit 8fa3590661
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
224 changed files with 2133 additions and 1318 deletions

View File

@ -51,8 +51,8 @@ jobs:
curl \
--proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source $HOME/.cargo/env
rustup toolchain install nightly
rustup default nightly
rustup toolchain install stable
rustup default stable
- name: Checkout Flutter
uses: actions/checkout@v2
with:

View File

@ -23,7 +23,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
toolchain: stable
override: true
- run: rustup component add rustfmt
working-directory: frontend/rust-lib
@ -38,7 +38,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
toolchain: stable
override: true
- run: rustup component add clippy
working-directory: frontend/rust-lib
@ -55,8 +55,8 @@ jobs:
curl \
--proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source $HOME/.cargo/env
rustup toolchain install nightly
rustup default nightly
rustup toolchain install stable
rustup default stable
- name: Frontend tests
working-directory: frontend/rust-lib
run: cargo test

View File

@ -1,2 +0,0 @@
[toolchain]
channel = "nightly-2021-04-24"

View File

@ -0,0 +1,2 @@
[toolchain]
channel = "stable-2022-01-20"

View File

@ -1,18 +1,18 @@
# https://rust-lang.github.io/rustfmt/?version=master&search=
max_width = 120
tab_spaces = 4
fn_single_line = true
match_block_trailing_comma = true
normalize_comments = true
wrap_comments = true
use_field_init_shorthand = true
use_try_shorthand = true
normalize_doc_attributes = true
report_todo = "Never"
report_fixme = "Always"
imports_layout = "HorizontalVertical"
imports_granularity = "Crate"
reorder_modules = true
reorder_imports = true
enum_discrim_align_threshold = 20
# fn_single_line = true
# match_block_trailing_comma = true
# normalize_comments = true
# wrap_comments = true
# use_field_init_shorthand = true
# use_try_shorthand = true
# normalize_doc_attributes = true
# report_todo = "Never"
# report_fixme = "Always"
# imports_layout = "HorizontalVertical"
# imports_granularity = "Crate"
# reorder_modules = true
# reorder_imports = true
# enum_discrim_align_threshold = 20
edition = "2018"

View File

@ -8,8 +8,7 @@ use tokio::time::interval;
use crate::{
config::{
env::{domain, secret, use_https},
DatabaseSettings,
Settings,
DatabaseSettings, Settings,
},
context::AppContext,
services::{
@ -34,9 +33,13 @@ impl Application {
Ok(Self { port, server })
}
pub async fn run_until_stopped(self) -> Result<(), std::io::Error> { self.server.await }
pub async fn run_until_stopped(self) -> Result<(), std::io::Error> {
self.server.await
}
pub fn port(&self) -> u16 { self.port }
pub fn port(&self) -> u16 {
self.port
}
}
pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result<Server, std::io::Error> {
@ -72,62 +75,63 @@ async fn period_check(_pool: PgPool) {
}
}
fn ws_scope() -> Scope { web::scope("/ws").service(crate::services::web_socket::router::establish_ws_connection) }
fn ws_scope() -> Scope {
web::scope("/ws").service(crate::services::web_socket::router::establish_ws_connection)
}
fn user_scope() -> Scope {
// https://developer.mozilla.org/en-US/docs/Web/HTTP
// TODO: replace GET body with query params
web::scope("/api")
// authentication
.service(web::resource("/auth")
.service(
web::resource("/auth")
.route(web::post().to(user::sign_in_handler))
.route(web::delete().to(user::sign_out_handler))
.route(web::delete().to(user::sign_out_handler)),
)
.service(web::resource("/user")
.service(
web::resource("/user")
.route(web::patch().to(user::set_user_profile_handler))
.route(web::get().to(user::get_user_profile_handler))
.route(web::get().to(user::get_user_profile_handler)),
)
.service(web::resource("/register")
.route(web::post().to(user::register_handler))
)
.service(web::resource("/workspace")
.service(web::resource("/register").route(web::post().to(user::register_handler)))
.service(
web::resource("/workspace")
.route(web::post().to(workspace::create_handler))
.route(web::delete().to(workspace::delete_handler))
.route(web::get().to(workspace::read_handler))
.route(web::patch().to(workspace::update_handler))
.route(web::patch().to(workspace::update_handler)),
)
.service(web::resource("/workspace_list/{user_id}")
.route(web::get().to(workspace::workspace_list))
)
.service(web::resource("/app")
.service(web::resource("/workspace_list/{user_id}").route(web::get().to(workspace::workspace_list)))
.service(
web::resource("/app")
.route(web::post().to(app::create_handler))
.route(web::get().to(app::read_handler))
.route(web::delete().to(app::delete_handler))
.route(web::patch().to(app::update_handler))
.route(web::patch().to(app::update_handler)),
)
.service(web::resource("/view")
.service(
web::resource("/view")
.route(web::post().to(view::create_handler))
.route(web::delete().to(view::delete_handler))
.route(web::get().to(view::read_handler))
.route(web::patch().to(view::update_handler))
.route(web::patch().to(view::update_handler)),
)
.service(web::resource("/doc")
.service(
web::resource("/doc")
.route(web::post().to(doc::create_document_handler))
.route(web::get().to(doc::read_document_handler))
.route(web::patch().to(doc::reset_document_handler))
.route(web::patch().to(doc::reset_document_handler)),
)
.service(web::resource("/trash")
.service(
web::resource("/trash")
.route(web::post().to(trash::create_handler))
.route(web::delete().to(trash::delete_handler))
.route(web::get().to(trash::read_handler))
)
.service(web::resource("/sync")
.route(web::post().to(trash::create_handler))
.route(web::get().to(trash::read_handler)),
)
.service(web::resource("/sync").route(web::post().to(trash::create_handler)))
// password
.service(web::resource("/password_change")
.route(web::post().to(user::change_password))
)
.service(web::resource("/password_change").route(web::post().to(user::change_password)))
}
pub async fn init_app_context(configuration: &Settings) -> AppContext {

View File

@ -49,7 +49,9 @@ impl DatabaseSettings {
.ssl_mode(ssl_mode)
}
pub fn with_db(&self) -> PgConnectOptions { self.without_db().database(&self.database_name) }
pub fn with_db(&self) -> PgConnectOptions {
self.without_db().database(&self.database_name)
}
}
pub fn get_configuration() -> Result<Settings, config::ConfigError> {

View File

@ -1,9 +1,17 @@
use std::env;
pub fn domain() -> String { env::var("DOMAIN").unwrap_or_else(|_| "localhost".to_string()) }
pub fn domain() -> String {
env::var("DOMAIN").unwrap_or_else(|_| "localhost".to_string())
}
pub fn jwt_secret() -> String { env::var("JWT_SECRET").unwrap_or_else(|_| "my secret".into()) }
pub fn jwt_secret() -> String {
env::var("JWT_SECRET").unwrap_or_else(|_| "my secret".into())
}
pub fn secret() -> String { env::var("SECRET_KEY").unwrap_or_else(|_| "0123".repeat(8)) }
pub fn secret() -> String {
env::var("SECRET_KEY").unwrap_or_else(|_| "0123".repeat(8))
}
pub fn use_https() -> bool { false }
pub fn use_https() -> bool {
false
}

View File

@ -56,7 +56,11 @@ pub struct FlowyPersistence {
}
impl FlowyPersistence {
pub fn pg_pool(&self) -> PgPool { self.pg_pool.clone() }
pub fn pg_pool(&self) -> PgPool {
self.pg_pool.clone()
}
pub fn kv_store(&self) -> Arc<DocumentKVPersistence> { self.kv_store.clone() }
pub fn kv_store(&self) -> Arc<DocumentKVPersistence> {
self.kv_store.clone()
}
}

View File

@ -15,7 +15,9 @@ pub struct LoggedUser {
}
impl std::convert::From<Claim> for LoggedUser {
fn from(c: Claim) -> Self { Self { user_id: c.user_id() } }
fn from(c: Claim) -> Self {
Self { user_id: c.user_id() }
}
}
impl LoggedUser {
@ -61,7 +63,7 @@ impl std::convert::TryFrom<&HeaderValue> for LoggedUser {
Err(e) => {
log::error!("Header to string failed: {:?}", e);
Err(ServerError::unauthorized())
},
}
}
}
}
@ -76,27 +78,31 @@ pub const EXPIRED_DURATION_DAYS: i64 = 30;
pub struct AuthorizedUsers(DashMap<LoggedUser, AuthStatus>);
impl std::default::Default for AuthorizedUsers {
fn default() -> Self { Self(DashMap::new()) }
fn default() -> Self {
Self(DashMap::new())
}
}
impl AuthorizedUsers {
pub fn new() -> Self { AuthorizedUsers::default() }
pub fn new() -> Self {
AuthorizedUsers::default()
}
pub fn is_authorized(&self, user: &LoggedUser) -> bool {
match self.0.get(user) {
None => {
tracing::debug!("user not login yet or server was reboot");
false
},
}
Some(status) => match *status {
AuthStatus::Authorized(last_time) => {
let current_time = Utc::now();
let days = (current_time - last_time).num_days();
days < EXPIRED_DURATION_DAYS
},
}
AuthStatus::NotAuthorized => {
tracing::debug!("user logout already");
false
},
}
},
}
}

View File

@ -37,7 +37,9 @@ impl Claim {
}
}
pub fn user_id(self) -> String { self.user_id }
pub fn user_id(self) -> String {
self.user_id
}
}
// impl From<Claim> for User {
@ -48,7 +50,7 @@ impl Claim {
pub struct Token(pub String);
impl Token {
pub fn create_token(user_id: &str) -> Result<Self, ServerError> {
let claims = Claim::with_user_id(&user_id);
let claims = Claim::with_user_id(user_id);
encode(
&Header::new(DEFAULT_ALGORITHM),
&claims,

View File

@ -7,6 +7,7 @@ pub struct UserTable {
pub(crate) id: uuid::Uuid,
pub(crate) email: String,
pub(crate) name: String,
#[allow(dead_code)]
pub(crate) create_time: chrono::DateTime<Utc>,
pub(crate) password: String,
}

View File

@ -1,9 +1,7 @@
use actix_service::{Service, Transform};
use actix_web::{
dev::{ServiceRequest, ServiceResponse},
Error,
HttpResponse,
ResponseError,
Error, HttpResponse, ResponseError,
};
use crate::{
@ -34,7 +32,9 @@ where
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future { ok(AuthenticationMiddleware { service }) }
fn new_transform(&self, service: S) -> Self::Future {
ok(AuthenticationMiddleware { service })
}
}
pub struct AuthenticationMiddleware<S> {
service: S,
@ -51,7 +51,9 @@ where
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.service.poll_ready(cx) }
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let mut authenticate_pass: bool = false;
@ -77,7 +79,7 @@ where
AUTHORIZED_USERS.store_auth(logged_user, true);
}
}
},
}
Err(e) => log::error!("{:?}", e),
}
} else {

View File

@ -101,7 +101,7 @@ fn default_color_style() -> Vec<u8> {
Err(e) => {
log::error!("Serialize color style failed: {:?}", e);
vec![]
},
}
}
}
@ -121,6 +121,7 @@ pub struct AppTable {
pub(crate) last_view_id: String,
pub(crate) modified_time: chrono::DateTime<Utc>,
pub(crate) create_time: chrono::DateTime<Utc>,
#[allow(dead_code)]
pub(crate) user_id: String,
}
impl std::convert::From<AppTable> for AppPB {

View File

@ -73,7 +73,7 @@ pub async fn update_handler(payload: Payload, pool: Data<PgPool>) -> Result<Http
true => {
let color_bytes = params.get_color_style().write_to_bytes()?;
Some(color_bytes)
},
}
};
let desc = match params.has_desc() {

View File

@ -6,6 +6,7 @@ pub(crate) const TRASH_TABLE: &str = "trash_table";
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct TrashTable {
pub(crate) id: uuid::Uuid,
#[allow(dead_code)]
pub(crate) user_id: String,
pub(crate) ty: i32,
}

View File

@ -117,13 +117,13 @@ async fn delete_trash_associate_targets(
match TrashType::from_i32(ty) {
None => log::error!("Parser trash type with value: {} failed", ty),
Some(ty) => match ty {
TrashType::Unknown => {},
TrashType::Unknown => {}
TrashType::View => {
let _ = delete_view(transaction as &mut DBTransaction<'_>, kv_store, vec![id]).await;
},
}
TrashType::App => {
let _ = delete_app(transaction as &mut DBTransaction<'_>, id).await;
},
}
},
}
}
@ -164,13 +164,13 @@ pub(crate) async fn read_trash(
match TrashType::from_i32(table.ty) {
None => log::error!("Parser trash type with value: {} failed", table.ty),
Some(ty) => match ty {
TrashType::Unknown => {},
TrashType::Unknown => {}
TrashType::View => {
trash.push(read_view_table(table.id, transaction).await?.into());
},
}
TrashType::App => {
trash.push(read_app_table(table.id, transaction).await?.into());
},
}
},
}
}

View File

@ -116,7 +116,7 @@ pub(crate) async fn read_view(
let mut views = RepeatedViewPB::default();
views.set_items(
read_view_belong_to_id(&table.id.to_string(), &user, transaction)
read_view_belong_to_id(&table.id.to_string(), user, transaction)
.await?
.into(),
);

View File

@ -2,11 +2,9 @@ use crate::{
context::FlowyPersistence,
entities::logged_user::LoggedUser,
services::core::view::{
create_view,
delete_view,
create_view, delete_view,
persistence::{check_view_id, check_view_ids},
read_view,
update_view,
read_view, update_view,
},
util::serde_ext::parse_from_payload,
};
@ -22,10 +20,8 @@ use backend_service::{
use flowy_core_data_model::{
parser::view::{ViewDesc, ViewName, ViewThumbnail},
protobuf::{
CreateViewParams as CreateViewParamsPB,
QueryViewRequest as QueryViewRequestPB,
UpdateViewParams as UpdateViewParamsPB,
ViewId as ViewIdPB,
CreateViewParams as CreateViewParamsPB, QueryViewRequest as QueryViewRequestPB,
UpdateViewParams as UpdateViewParamsPB, ViewId as ViewIdPB,
},
};
use sqlx::PgPool;

View File

@ -1,11 +1,7 @@
use crate::{
entities::logged_user::LoggedUser,
services::core::workspace::{
create_workspace,
delete_workspace,
persistence::check_workspace_id,
read_workspaces,
update_workspace,
create_workspace, delete_workspace, persistence::check_workspace_id, read_workspaces, update_workspace,
},
util::serde_ext::parse_from_payload,
};
@ -21,8 +17,7 @@ use backend_service::{
use flowy_core_data_model::{
parser::workspace::{WorkspaceDesc, WorkspaceName},
protobuf::{
CreateWorkspaceParams as CreateWorkspaceParamsPB,
UpdateWorkspaceParams as UpdateWorkspaceParamsPB,
CreateWorkspaceParams as CreateWorkspaceParamsPB, UpdateWorkspaceParams as UpdateWorkspaceParamsPB,
WorkspaceId as WorkspaceIdPB,
},
};
@ -110,7 +105,7 @@ pub async fn update_handler(
.map_err(invalid_params)?
.0;
Some(name)
},
}
};
let desc = match params.has_desc() {
@ -120,7 +115,7 @@ pub async fn update_handler(
.map_err(invalid_params)?
.0;
Some(desc)
},
}
};
let mut transaction = pool

View File

@ -7,11 +7,7 @@ use backend_service::errors::{internal_error, ServerError};
use bytes::Bytes;
use flowy_collaboration::{
protobuf::{
CreateDocParams,
DocumentId,
DocumentInfo,
RepeatedRevision as RepeatedRevisionPB,
ResetDocumentParams,
CreateDocParams, DocumentId, DocumentInfo, RepeatedRevision as RepeatedRevisionPB, ResetDocumentParams,
Revision as RevisionPB,
},
sync::ServerDocumentManager,
@ -71,15 +67,21 @@ pub struct DocumentKVPersistence {
impl std::ops::Deref for DocumentKVPersistence {
type Target = Arc<KVStore>;
fn deref(&self) -> &Self::Target { &self.inner }
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl std::ops::DerefMut for DocumentKVPersistence {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner }
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl DocumentKVPersistence {
pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
pub(crate) fn new(kv_store: Arc<KVStore>) -> Self {
DocumentKVPersistence { inner: kv_store }
}
pub(crate) async fn batch_set_revision(&self, revisions: Vec<RevisionPB>) -> Result<(), ServerError> {
let items = revisions_to_key_value_items(revisions)?;
@ -109,7 +111,7 @@ impl DocumentKVPersistence {
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
.await?
},
}
Some(rev_ids) => {
let keys = rev_ids
.into_iter()
@ -119,7 +121,7 @@ impl DocumentKVPersistence {
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_get(keys).await }))
.await?
},
}
};
Ok(key_value_items_to_revisions(items))
@ -136,7 +138,7 @@ impl DocumentKVPersistence {
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&doc_id).await }))
.await
},
}
Some(rev_ids) => {
let keys = rev_ids
.into_iter()
@ -146,7 +148,7 @@ impl DocumentKVPersistence {
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await }))
.await
},
}
}
}
}
@ -181,7 +183,9 @@ fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevisionPB {
}
#[inline]
fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }
fn make_revision_key(doc_id: &str, rev_id: i64) -> String {
format!("{}:{}", doc_id, rev_id)
}
#[inline]
fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevisionPB) -> Result<DocumentInfo, ServerError> {

View File

@ -10,9 +10,7 @@ use actix_web::{
use backend_service::{errors::ServerError, response::FlowyResponse};
use flowy_collaboration::{
protobuf::{
CreateDocParams as CreateDocParamsPB,
DocumentId as DocumentIdPB,
ResetDocumentParams as ResetDocumentParamsPB,
CreateDocParams as CreateDocParamsPB, DocumentId as DocumentIdPB, ResetDocumentParams as ResetDocumentParamsPB,
},
sync::ServerDocumentManager,
};

View File

@ -9,8 +9,7 @@ use backend_service::errors::{internal_error, Result, ServerError};
use flowy_collaboration::{
protobuf::{
DocumentClientWSData as DocumentClientWSDataPB,
DocumentClientWSDataType as DocumentClientWSDataTypePB,
DocumentClientWSData as DocumentClientWSDataPB, DocumentClientWSDataType as DocumentClientWSDataTypePB,
Revision as RevisionPB,
},
sync::{RevisionUser, ServerDocumentManager, SyncResponse},
@ -66,7 +65,7 @@ impl DocumentWebSocketActor {
ret,
} => {
let _ = ret.send(self.handle_client_data(client_data, persistence).await);
},
}
}
}
@ -96,14 +95,14 @@ impl DocumentWebSocketActor {
.handle_client_revisions(user, document_client_data)
.await
.map_err(internal_error)?;
},
}
DocumentClientWSDataTypePB::ClientPing => {
let _ = self
.doc_manager
.handle_client_ping(user, document_client_data)
.await
.map_err(internal_error)?;
},
}
}
Ok(())
@ -135,37 +134,39 @@ impl std::fmt::Debug for ServerDocUser {
}
impl RevisionUser for ServerDocUser {
fn user_id(&self) -> String { self.user.id().to_string() }
fn user_id(&self) -> String {
self.user.id().to_string()
}
fn receive(&self, resp: SyncResponse) {
let result = match resp {
SyncResponse::Pull(data) => {
let msg: WebSocketMessage = data.into();
self.socket.try_send(msg).map_err(internal_error)
},
}
SyncResponse::Push(data) => {
let msg: WebSocketMessage = data.into();
self.socket.try_send(msg).map_err(internal_error)
},
}
SyncResponse::Ack(data) => {
let msg: WebSocketMessage = data.into();
self.socket.try_send(msg).map_err(internal_error)
},
}
SyncResponse::NewRevision(mut repeated_revision) => {
let kv_store = self.persistence.kv_store();
tokio::task::spawn(async move {
let revisions = repeated_revision.take_items().into();
match kv_store.batch_set_revision(revisions).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("{}", e),
}
});
Ok(())
},
}
};
match result {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("[ServerDocUser]: {}", e),
}
}

View File

@ -13,9 +13,7 @@ use flowy_collaboration::{
entities::doc::DocumentInfo,
errors::CollaborateError,
protobuf::{
CreateDocParams as CreateDocParamsPB,
DocumentId,
RepeatedRevision as RepeatedRevisionPB,
CreateDocParams as CreateDocParamsPB, DocumentId, RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB,
},
sync::{DocumentPersistence, ServerDocumentManager},
@ -65,11 +63,11 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
};
match sender.send(msg).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("{}", e),
}
match rx.await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("{:?}", e),
};
});
@ -78,7 +76,9 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
pub struct DocumentPersistenceImpl(pub Arc<FlowyPersistence>);
impl Debug for DocumentPersistenceImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("DocumentPersistenceImpl")
}
}
impl DocumentPersistence for DocumentPersistenceImpl {

View File

@ -11,11 +11,7 @@ use lib_infra::future::BoxResultFuture;
use sql_builder::SqlBuilder as RawSqlBuilder;
use sqlx::{
postgres::{PgArguments, PgRow},
Arguments,
Error,
PgPool,
Postgres,
Row,
Arguments, Error, PgPool, Postgres, Row,
};
const KV_TABLE: &str = "kv_table";
@ -208,6 +204,7 @@ fn rows_to_key_values(rows: Vec<PgRow>) -> Vec<KeyValue> {
#[derive(Debug, Clone, sqlx::FromRow)]
struct KVTable {
#[allow(dead_code)]
pub(crate) id: String,
pub(crate) blob: Vec<u8>,
}

View File

@ -18,12 +18,8 @@ use chrono::Utc;
use flowy_user_data_model::{
parser::{UserEmail, UserName, UserPassword},
protobuf::{
SignInParams as SignInParamsPB,
SignInResponse as SignInResponsePB,
SignUpParams as SignUpParamsPB,
SignUpResponse as SignUpResponsePB,
UpdateUserParams as UpdateUserParamsPB,
UserProfile as UserProfilePB,
SignInParams as SignInParamsPB, SignInResponse as SignInResponsePB, SignUpParams as SignUpParamsPB,
SignUpResponse as SignUpResponsePB, UpdateUserParams as UpdateUserParamsPB, UserProfile as UserProfilePB,
},
};
use sqlx::{PgPool, Postgres};
@ -150,7 +146,7 @@ pub(crate) async fn set_user_profile(
let password = UserPassword::parse(params.get_password().to_owned()).map_err(invalid_params)?;
let password = hash_password(password.as_ref())?;
Some(password)
},
}
};
let (sql, args) = SqlBuilder::update("user_table")
@ -200,7 +196,7 @@ async fn check_user_password(
.await
.map_err(|err| ServerError::internal().context(err))?;
match verify_password(&password, &user.password) {
match verify_password(password, &user.password) {
Ok(true) => Ok(user),
_ => Err(ServerError::password_not_match()),
}

View File

@ -6,14 +6,11 @@ use crate::{
use actix_identity::Identity;
use actix_web::{
web::{Data, Payload},
HttpRequest,
HttpResponse,
HttpRequest, HttpResponse,
};
use backend_service::{errors::ServerError, response::FlowyResponse};
use flowy_user_data_model::protobuf::{
SignInParams as SignInParamsPB,
SignUpParams as SignUpParamsPB,
UpdateUserParams as UpdateUserParamsPB,
SignInParams as SignInParamsPB, SignUpParams as SignUpParamsPB, UpdateUserParams as UpdateUserParamsPB,
};
use sqlx::PgPool;

View File

@ -10,13 +10,15 @@ pub type Socket = Recipient<WebSocketMessage>;
pub struct SessionId(pub String);
impl<T: AsRef<str>> std::convert::From<T> for SessionId {
fn from(s: T) -> Self { SessionId(s.as_ref().to_owned()) }
fn from(s: T) -> Self {
SessionId(s.as_ref().to_owned())
}
}
impl std::fmt::Display for SessionId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let desc = &self.0.to_string();
f.write_str(&desc)
f.write_str(desc)
}
}

View File

@ -11,7 +11,9 @@ pub struct WebSocketMessage(pub Bytes);
impl std::ops::Deref for WebSocketMessage {
type Target = Bytes;
fn deref(&self) -> &Self::Target { &self.0 }
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::convert::From<DocumentClientWSData> for WebSocketMessage {

View File

@ -6,9 +6,7 @@ use actix::Addr;
use actix_web::{
get,
web::{Data, Path, Payload},
Error,
HttpRequest,
HttpResponse,
Error, HttpRequest, HttpResponse,
};
use actix_web_actors::ws;

View File

@ -3,8 +3,7 @@ use crate::{
entities::logged_user::LoggedUser,
services::web_socket::{
entities::{Connect, Disconnect, Socket},
WSServer,
WebSocketMessage,
WSServer, WebSocketMessage,
},
};
use actix::*;
@ -18,22 +17,23 @@ pub trait WebSocketReceiver: Send + Sync {
fn receive(&self, data: WSClientData);
}
#[derive(Default)]
pub struct WebSocketReceivers {
inner: HashMap<WSModule, Arc<dyn WebSocketReceiver>>,
}
impl std::default::Default for WebSocketReceivers {
fn default() -> Self { Self { inner: HashMap::new() } }
}
impl WebSocketReceivers {
pub fn new() -> Self { WebSocketReceivers::default() }
pub fn new() -> Self {
WebSocketReceivers::default()
}
pub fn set(&mut self, source: WSModule, receiver: Arc<dyn WebSocketReceiver>) {
self.inner.insert(source, receiver);
}
pub fn get(&self, source: &WSModule) -> Option<Arc<dyn WebSocketReceiver>> { self.inner.get(source).cloned() }
pub fn get(&self, source: &WSModule) -> Option<Arc<dyn WebSocketReceiver>> {
self.inner.get(source).cloned()
}
}
#[derive(Debug)]
@ -42,9 +42,13 @@ pub struct WSUser {
}
impl WSUser {
pub fn new(inner: LoggedUser) -> Self { Self { inner } }
pub fn new(inner: LoggedUser) -> Self {
Self { inner }
}
pub fn id(&self) -> &str { &self.inner.user_id }
pub fn id(&self) -> &str {
&self.inner.user_id
}
}
pub struct WSClientData {
@ -89,7 +93,7 @@ impl WSClient {
match self.ws_receivers.get(&message.module) {
None => {
log::error!("Can't find the receiver for {:?}", message.module);
},
}
Some(handler) => {
let client_data = WSClientData {
user: self.user.clone(),
@ -97,7 +101,7 @@ impl WSClient {
data: Bytes::from(message.data),
};
handler.receive(client_data);
},
}
}
}
}
@ -108,28 +112,28 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
},
}
Ok(ws::Message::Pong(_msg)) => {
// tracing::debug!("Receive {} pong {:?}", &self.session_id, &msg);
self.hb = Instant::now();
},
}
Ok(ws::Message::Binary(bytes)) => {
let socket = ctx.address().recipient();
self.handle_binary_message(bytes, socket);
},
}
Ok(Text(_)) => {
log::warn!("Receive unexpected text message");
},
}
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
},
Ok(ws::Message::Continuation(_)) => {},
Ok(ws::Message::Nop) => {},
}
Ok(ws::Message::Continuation(_)) => {}
Ok(ws::Message::Nop) => {}
Err(e) => {
log::error!("[{}]: WebSocketStream protocol error {:?}", self.user.id(), e);
ctx.stop();
},
}
}
}
}
@ -137,7 +141,9 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
impl Handler<WebSocketMessage> for WSClient {
type Result = ();
fn handle(&mut self, msg: WebSocketMessage, ctx: &mut Self::Context) { ctx.binary(msg.0); }
fn handle(&mut self, msg: WebSocketMessage, ctx: &mut Self::Context) {
ctx.binary(msg.0);
}
}
impl Actor for WSClient {

View File

@ -18,9 +18,13 @@ impl std::default::Default for WSServer {
}
}
impl WSServer {
pub fn new() -> Self { WSServer::default() }
pub fn new() -> Self {
WSServer::default()
}
pub fn send(&self, _msg: WebSocketMessage) { unimplemented!() }
pub fn send(&self, _msg: WebSocketMessage) {
unimplemented!()
}
}
impl Actor for WSServer {
@ -49,7 +53,9 @@ impl Handler<Disconnect> for WSServer {
impl Handler<WebSocketMessage> for WSServer {
type Result = ();
fn handle(&mut self, _msg: WebSocketMessage, _ctx: &mut Context<Self>) -> Self::Result { unimplemented!() }
fn handle(&mut self, _msg: WebSocketMessage, _ctx: &mut Context<Self>) -> Self::Result {
unimplemented!()
}
}
impl actix::Supervised for WSServer {

View File

@ -22,7 +22,7 @@ pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
}
pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, ServerError> {
let result: ProtobufResult<T> = Message::parse_from_bytes(&bytes);
let result: ProtobufResult<T> = Message::parse_from_bytes(bytes);
match result {
Ok(data) => Ok(data),
Err(e) => Err(e.into()),

View File

@ -117,7 +117,7 @@ impl SqlBuilder {
let sql = inner.sql()?;
Ok((sql, self.fields_args))
},
}
BuilderType::Select => {
let mut inner = InnerBuilder::select_from(&self.table);
self.fields.into_iter().for_each(|field| {
@ -130,7 +130,7 @@ impl SqlBuilder {
let sql = inner.sql()?;
Ok((sql, self.fields_args))
},
}
BuilderType::Update => {
let mut inner = InnerBuilder::update_table(&self.table);
let field_len = self.fields.len();
@ -145,7 +145,7 @@ impl SqlBuilder {
let sql = inner.sql()?;
Ok((sql, self.fields_args))
},
}
BuilderType::Delete => {
let mut inner = InnerBuilder::delete_from(&self.table);
self.filters.into_iter().enumerate().for_each(|(index, filter)| {
@ -153,7 +153,7 @@ impl SqlBuilder {
});
let sql = inner.sql()?;
Ok((sql, self.fields_args))
},
}
}
}
}

View File

@ -2,7 +2,9 @@ use backend_service::errors::{ErrorCode, ServerError};
use bcrypt::{hash, verify, DEFAULT_COST};
#[allow(dead_code)]
pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() }
pub fn uuid() -> String {
uuid::Uuid::new_v4().to_string()
}
pub fn hash_password(plain: &str) -> Result<String, ServerError> {
let hashing_cost = std::env::var("HASH_COST")

View File

@ -76,10 +76,10 @@ async fn user_update_password() {
};
match server.sign_in(sign_in_params).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
assert_eq!(e.code, ErrorCode::PasswordNotMatch);
},
}
}
}

View File

@ -45,9 +45,13 @@ impl TestUserServer {
let _ = user_sign_out_request(self.user_token(), &url).await.unwrap();
}
pub fn user_token(&self) -> &str { self.user_token.as_ref().expect("must call register_user first ") }
pub fn user_token(&self) -> &str {
self.user_token.as_ref().expect("must call register_user first ")
}
pub fn user_id(&self) -> &str { self.user_id.as_ref().expect("must call register_user first ") }
pub fn user_id(&self) -> &str {
self.user_id.as_ref().expect("must call register_user first ")
}
pub async fn get_user_profile(&self) -> UserProfile {
let url = format!("{}/api/user", self.http_addr());
@ -178,7 +182,9 @@ impl TestUserServer {
response
}
pub fn http_addr(&self) -> String { self.inner.client_server_config.base_url() }
pub fn http_addr(&self) -> String {
self.inner.client_server_config.base_url()
}
pub fn ws_addr(&self) -> String {
format!(
@ -336,7 +342,9 @@ impl WorkspaceTest {
Self { server, workspace }
}
pub async fn create_app(&self) -> App { create_test_app(&self.server, &self.workspace.id).await }
pub async fn create_app(&self) -> App {
create_test_app(&self.server, &self.workspace.id).await
}
}
pub struct AppTest {

View File

@ -34,8 +34,8 @@ yay -S curl base-devel sqlite openssl clang cmake ninja pkg-config gtk3 unzip
```shell
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
source $HOME/.cargo/env
rustup toolchain install nightly
rustup default nightly
rustup toolchain install stable
rustup default stable
```
3. Install flutter according to https://docs.flutter.dev/get-started/install/linux

View File

@ -32,7 +32,7 @@ flutter doctor
```shell
# Download rustup.exe from https://win.rustup.rs/x86_64
# Call rustup.exe from powershell or cmd
.\rustup-init.exe --default-toolchain nightly --default-host x86_64-pc-windows-msvc -y
.\rustup-init.exe --default-toolchain stable --default-host x86_64-pc-windows-msvc -y
# Note: you probably need to re-open termial to get cargo command be available in PATH var
```
5. Install cargo make

View File

@ -9,4 +9,4 @@ install_cargo_make:
install_rust:
brew bundle
rustup-init -y --default-toolchain=nightly
rustup-init -y --default-toolchain=stable

View File

@ -59,4 +59,4 @@ windows/flutter/dart_ffi/
**/**/*.lib
**/**/*.dll
**/**/*.so
**/Brewfile.lock.json
**/**/Brewfile.lock.json

View File

@ -18,7 +18,9 @@ lazy_static! {
static ref FLOWY_SDK: RwLock<Option<Arc<FlowySDK>>> = RwLock::new(None);
}
fn dispatch() -> Arc<EventDispatcher> { FLOWY_SDK.read().as_ref().unwrap().dispatcher() }
fn dispatch() -> Arc<EventDispatcher> {
FLOWY_SDK.read().as_ref().unwrap().dispatcher()
}
#[no_mangle]
pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
@ -85,13 +87,13 @@ async fn post_to_flutter(response: EventResponse, port: i64) {
{
Ok(_success) => {
log::trace!("[FFI]: Post data to dart success");
},
}
Err(e) => {
if let Some(msg) = e.downcast_ref::<&str>() {
log::error!("[FFI]: {:?}", msg);
} else {
log::error!("[FFI]: allo_isolate post panic");
}
},
}
}
}

View File

@ -22,5 +22,7 @@ impl FFIRequest {
}
impl std::convert::From<FFIRequest> for ModuleRequest {
fn from(ffi_request: FFIRequest) -> Self { ModuleRequest::new(ffi_request.event).payload(ffi_request.payload) }
fn from(ffi_request: FFIRequest) -> Self {
ModuleRequest::new(ffi_request.event).payload(ffi_request.payload)
}
}

View File

@ -9,7 +9,9 @@ pub enum FFIStatusCode {
}
impl std::default::Default for FFIStatusCode {
fn default() -> FFIStatusCode { FFIStatusCode::Ok }
fn default() -> FFIStatusCode {
FFIStatusCode::Ok
}
}
#[derive(ProtoBuf, Default)]

View File

@ -13,7 +13,9 @@ pub struct DartStreamSender {
}
impl DartStreamSender {
fn new() -> Self { Self { isolate: None } }
fn new() -> Self {
Self { isolate: None }
}
fn inner_set_port(&mut self, port: i64) {
log::info!("Setup rust to flutter stream with port {}", port);
@ -27,7 +29,7 @@ impl DartStreamSender {
let bytes: Bytes = observable_subject.try_into().unwrap();
isolate.post(bytes.to_vec());
Ok(())
},
}
None => Err("Isolate is not set".to_owned()),
}
}
@ -38,7 +40,7 @@ impl DartStreamSender {
Err(e) => {
let msg = format!("Get rust to flutter stream lock fail. {:?}", e);
log::error!("{:?}", msg);
},
}
}
}

View File

@ -34,7 +34,7 @@ impl DartNotifyBuilder {
Ok(bytes) => self.payload = Some(bytes),
Err(e) => {
log::error!("Set observable payload failed: {:?}", e);
},
}
}
self
@ -48,7 +48,7 @@ impl DartNotifyBuilder {
Ok(bytes) => self.error = Some(bytes),
Err(e) => {
log::error!("Set observable error failed: {:?}", e);
},
}
}
self
}
@ -67,7 +67,7 @@ impl DartNotifyBuilder {
};
match DartStreamSender::post(subject) {
Ok(_) => {},
Ok(_) => {}
Err(error) => log::error!("Send observable subject failed: {}", error),
}
}

View File

@ -57,10 +57,10 @@ impl CoreContext {
pub fn network_state_changed(&self, new_type: NetworkType) {
match new_type {
NetworkType::UnknownNetworkType => {},
NetworkType::Wifi => {},
NetworkType::Cell => {},
NetworkType::Ethernet => {},
NetworkType::UnknownNetworkType => {}
NetworkType::Wifi => {}
NetworkType::Cell => {}
NetworkType::Ethernet => {}
}
}

View File

@ -3,8 +3,7 @@ use crate::{
errors::FlowyError,
notify::{send_dart_notification, WorkspaceNotification},
services::{
get_current_workspace,
read_local_workspace_apps,
get_current_workspace, read_local_workspace_apps,
workspace::sql::{WorkspaceTable, WorkspaceTableSql},
},
};
@ -86,14 +85,14 @@ fn read_workspaces_on_server(
for app in apps {
let views = app.belongings.clone().into_inner();
match app_ctrl.save_app(app, &*conn) {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("create app failed: {:?}", e),
}
tracing::debug!("Save {} views", views.len());
for view in views {
match view_ctrl.save_view(view, &*conn) {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("create view failed: {:?}", e),
}
}

View File

@ -15,15 +15,21 @@ macro_rules! impl_def_and_def_mut {
impl std::ops::Deref for $target {
type Target = Vec<$item>;
fn deref(&self) -> &Self::Target { &self.items }
fn deref(&self) -> &Self::Target {
&self.items
}
}
impl std::ops::DerefMut for $target {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.items }
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.items
}
}
impl $target {
#[allow(dead_code)]
pub fn into_inner(&mut self) -> Vec<$item> { ::std::mem::replace(&mut self.items, vec![]) }
pub fn into_inner(&mut self) -> Vec<$item> {
::std::mem::replace(&mut self.items, vec![])
}
#[allow(dead_code)]
pub fn push(&mut self, item: $item) {
@ -35,7 +41,9 @@ macro_rules! impl_def_and_def_mut {
self.items.push(item);
}
pub fn first_or_crash(&self) -> &$item { self.items.first().unwrap() }
pub fn first_or_crash(&self) -> &$item {
self.items.first().unwrap()
}
}
};
}

View File

@ -6,15 +6,8 @@ use crate::{
event::WorkspaceEvent,
event_handler::*,
services::{
app::event_handler::*,
server::construct_workspace_server,
trash::event_handler::*,
view::event_handler::*,
workspace::event_handler::*,
AppController,
TrashController,
ViewController,
WorkspaceController,
app::event_handler::*, server::construct_workspace_server, trash::event_handler::*, view::event_handler::*,
workspace::event_handler::*, AppController, TrashController, ViewController, WorkspaceController,
},
};
use backend_service::configuration::ClientServerConfiguration;

View File

@ -22,11 +22,15 @@ pub(crate) enum WorkspaceNotification {
}
impl std::default::Default for WorkspaceNotification {
fn default() -> Self { WorkspaceNotification::Unknown }
fn default() -> Self {
WorkspaceNotification::Unknown
}
}
impl std::convert::From<WorkspaceNotification> for i32 {
fn from(notification: WorkspaceNotification) -> Self { notification as i32 }
fn from(notification: WorkspaceNotification) -> Self {
notification as i32
}
}
#[tracing::instrument(level = "debug")]

View File

@ -9,8 +9,7 @@ use crate::{
services::{
app::sql::{AppTable, AppTableChangeset, AppTableSql},
server::Server,
TrashController,
TrashEvent,
TrashController, TrashEvent,
},
};
use flowy_database::SqliteConnection;
@ -125,11 +124,11 @@ impl AppController {
let server = self.server.clone();
tokio::spawn(async move {
match server.update_app(&token, params).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
// TODO: retry?
log::error!("Update app failed: {:?}", e);
},
}
}
});
Ok(())
@ -152,13 +151,13 @@ impl AppController {
send_dart_notification(&app.id, WorkspaceNotification::AppUpdated)
.payload(app)
.send();
},
}
Err(e) => log::error!("Save app failed: {:?}", e),
}
},
}
Err(e) => log::error!("Require db connection failed: {:?}", e),
},
Ok(None) => {},
Ok(None) => {}
Err(e) => log::error!("Read app failed: {:?}", e),
}
});
@ -202,7 +201,7 @@ async fn handle_trash_event(database: Arc<dyn WorkspaceDatabase>, trash_can: Arc
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
},
}
TrashEvent::Delete(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
@ -222,7 +221,7 @@ async fn handle_trash_event(database: Arc<dyn WorkspaceDatabase>, trash_can: Arc
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
},
}
}
}

View File

@ -26,7 +26,7 @@ impl AppTableSql {
_ => {
let changeset = AppTableChangeset::from_table(app_table);
diesel_update_table!(app_table, changeset, conn)
},
}
}
Ok(())
}
@ -145,7 +145,9 @@ impl std::convert::From<ColorStyle> for ColorStyleCol {
impl std::convert::TryInto<Vec<u8>> for &ColorStyleCol {
type Error = String;
fn try_into(self) -> Result<Vec<u8>, Self::Error> { bincode::serialize(self).map_err(|e| format!("{:?}", e)) }
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
bincode::serialize(self).map_err(|e| format!("{:?}", e))
}
}
impl std::convert::TryFrom<&[u8]> for ColorStyleCol {

View File

@ -17,7 +17,9 @@ pub struct WorkspaceHttpServer {
}
impl WorkspaceHttpServer {
pub fn new(config: ClientServerConfiguration) -> WorkspaceHttpServer { Self { config } }
pub fn new(config: ClientServerConfiguration) -> WorkspaceHttpServer {
Self { config }
}
}
impl WorkspaceServerAPI for WorkspaceHttpServer {

View File

@ -29,7 +29,9 @@ impl TrashController {
}
}
pub(crate) fn init(&self) -> Result<(), FlowyError> { Ok(()) }
pub(crate) fn init(&self) -> Result<(), FlowyError> {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> {
@ -112,9 +114,9 @@ impl TrashController {
let _ = self.notify.send(TrashEvent::Delete(trash_identifiers.clone(), tx));
match rx.recv().await {
None => {},
None => {}
Some(result) => match result {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("{}", e),
},
}
@ -171,7 +173,9 @@ impl TrashController {
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> { self.notify.subscribe() }
pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> {
self.notify.subscribe()
}
pub fn read_trash(&self, conn: &SqliteConnection) -> Result<RepeatedTrash, FlowyError> {
let repeated_trash = TrashTableSql::read_all(&*conn)?;
@ -198,7 +202,7 @@ impl TrashController {
// TODO: retry?
let _ = tokio::spawn(async move {
match server.create_trash(&token, trash_identifiers).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("Create trash failed: {:?}", e),
}
});
@ -212,7 +216,7 @@ impl TrashController {
let server = self.server.clone();
let _ = tokio::spawn(async move {
match server.delete_trash(&token, trash_identifiers).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("Delete trash failed: {:?}", e),
}
});
@ -239,13 +243,13 @@ impl TrashController {
match result {
Ok(repeated_trash) => {
notify_trash_changed(repeated_trash);
},
}
Err(e) => log::error!("Save trash failed: {:?}", e),
}
},
}
Err(e) => log::error!("Require db connection failed: {:?}", e),
}
},
}
Err(e) => log::error!("Read trash failed: {:?}", e),
}
});
@ -295,7 +299,7 @@ impl TrashEvent {
} else {
Some(TrashEvent::Putback(identifiers, sender))
}
},
}
TrashEvent::Delete(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
@ -303,7 +307,7 @@ impl TrashEvent {
} else {
Some(TrashEvent::Delete(identifiers, sender))
}
},
}
TrashEvent::NewTrash(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
@ -311,7 +315,7 @@ impl TrashEvent {
} else {
Some(TrashEvent::NewTrash(identifiers, sender))
}
},
}
}
}
}

View File

@ -20,7 +20,7 @@ impl TrashTableSql {
_ => {
let changeset = TrashTableChangeset::from(trash_table);
diesel_update_table!(trash_table, changeset, conn)
},
}
}
}

View File

@ -18,8 +18,7 @@ use crate::{
services::{
server::Server,
view::sql::{ViewTable, ViewTableChangeset, ViewTableSql},
TrashController,
TrashEvent,
TrashController, TrashEvent,
},
};
use flowy_core_data_model::entities::share::{ExportData, ExportParams};
@ -84,7 +83,7 @@ impl ViewController {
conn.immediate_transaction::<_, FlowyError, _>(|| {
let belong_to_id = view.belong_to_id.clone();
let _ = self.save_view(view, conn)?;
let _ = notify_views_changed(&belong_to_id, trash_can, &conn)?;
let _ = notify_views_changed(&belong_to_id, trash_can, conn)?;
Ok(())
})?;
@ -227,11 +226,13 @@ impl ViewController {
let conn = self.database.db_connection()?;
let view_table = ViewTableSql::read_view(&view_id, &*conn)?;
Ok(Some(view_table.into()))
},
}
}
}
pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); }
pub(crate) fn set_latest_view(&self, view: &View) {
KV::set_str(LATEST_VIEW_ID, view.id.clone());
}
}
impl ViewController {
@ -248,11 +249,11 @@ impl ViewController {
let server = self.server.clone();
tokio::spawn(async move {
match server.update_view(&token, params).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
// TODO: retry?
log::error!("Update view failed: {:?}", e);
},
}
}
});
Ok(())
@ -275,13 +276,13 @@ impl ViewController {
send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
.payload(view.clone())
.send();
},
}
Err(e) => log::error!("Save view failed: {:?}", e),
}
},
}
Err(e) => log::error!("Require db connection failed: {:?}", e),
},
Ok(None) => {},
Ok(None) => {}
Err(e) => log::error!("Read view failed: {:?}", e),
}
});
@ -331,7 +332,7 @@ async fn handle_trash_event(
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
},
}
TrashEvent::Putback(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
@ -343,7 +344,7 @@ async fn handle_trash_event(
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
},
}
TrashEvent::Delete(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
@ -365,7 +366,7 @@ async fn handle_trash_event(
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
},
}
}
}
@ -394,7 +395,7 @@ fn notify_views_changed(
) -> FlowyResult<()> {
let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), conn)?;
tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
send_dart_notification(belong_to_id, WorkspaceNotification::AppViewsChanged)
.payload(repeated_view)
.send();
Ok(())

View File

@ -2,14 +2,8 @@ use crate::{
entities::{
trash::Trash,
view::{
CreateViewParams,
CreateViewRequest,
QueryViewRequest,
RepeatedViewId,
UpdateViewParams,
UpdateViewRequest,
View,
ViewId,
CreateViewParams, CreateViewRequest, QueryViewRequest, RepeatedViewId, UpdateViewParams, UpdateViewRequest,
View, ViewId,
},
},
errors::FlowyError,

View File

@ -23,7 +23,7 @@ impl ViewTableSql {
_ => {
let changeset = ViewTableChangeset::from_table(view_table);
diesel_update_table!(view_table, changeset, conn)
},
}
}
Ok(())
}
@ -219,7 +219,9 @@ pub enum ViewTableType {
}
impl std::default::Default for ViewTableType {
fn default() -> Self { ViewTableType::Docs }
fn default() -> Self {
ViewTableType::Docs
}
}
impl std::convert::From<i32> for ViewTableType {
@ -229,13 +231,15 @@ impl std::convert::From<i32> for ViewTableType {
o => {
log::error!("Unsupported view type {}, fallback to ViewType::Docs", o);
ViewTableType::Docs
},
}
}
}
}
impl ViewTableType {
pub fn value(&self) -> i32 { *self as i32 }
pub fn value(&self) -> i32 {
*self as i32
}
}
impl_sql_integer_expression!(ViewTableType);

View File

@ -35,7 +35,9 @@ impl WorkspaceController {
}
}
pub(crate) fn init(&self) -> Result<(), FlowyError> { Ok(()) }
pub(crate) fn init(&self) -> Result<(), FlowyError> {
Ok(())
}
pub(crate) async fn create_workspace_from_params(
&self,
@ -124,7 +126,7 @@ impl WorkspaceController {
set_current_workspace(&workspace.id);
Ok(workspace)
} else {
return Err(FlowyError::workspace_id().context("Opened workspace id should not be empty"));
Err(FlowyError::workspace_id().context("Opened workspace id should not be empty"))
}
}
@ -191,11 +193,11 @@ impl WorkspaceController {
let (token, server) = (self.user.token()?, self.server.clone());
tokio::spawn(async move {
match server.update_workspace(&token, params).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
// TODO: retry?
log::error!("Update workspace failed: {:?}", e);
},
}
}
});
Ok(())
@ -209,11 +211,11 @@ impl WorkspaceController {
let (token, server) = (self.user.token()?, self.server.clone());
tokio::spawn(async move {
match server.delete_workspace(&token, params).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
// TODO: retry?
log::error!("Delete workspace failed: {:?}", e);
},
}
}
});
Ok(())
@ -222,14 +224,16 @@ impl WorkspaceController {
const CURRENT_WORKSPACE_ID: &str = "current_workspace_id";
fn set_current_workspace(workspace_id: &str) { KV::set_str(CURRENT_WORKSPACE_ID, workspace_id.to_owned()); }
fn set_current_workspace(workspace_id: &str) {
KV::set_str(CURRENT_WORKSPACE_ID, workspace_id.to_owned());
}
pub fn get_current_workspace() -> Result<String, FlowyError> {
match KV::get_str(CURRENT_WORKSPACE_ID) {
None => {
Err(FlowyError::record_not_found()
.context("Current workspace not found or should call open workspace first"))
},
}
Some(workspace_id) => Ok(workspace_id),
}
}

View File

@ -19,7 +19,7 @@ impl WorkspaceTableSql {
_ => {
let changeset = WorkspaceTableChangeset::from_table(table);
diesel_update_table!(workspace_table, changeset, conn);
},
}
}
Ok(())
}

View File

@ -57,7 +57,7 @@ impl KV {
match KV_HOLDER.write() {
Ok(mut guard) => {
guard.cache.remove(key);
},
}
Err(e) => log::error!("Require write lock failed: {:?}", e),
};
@ -94,7 +94,7 @@ fn read_cache(key: &str) -> Option<KeyValue> {
Err(e) => {
log::error!("Require read lock failed: {:?}", e);
None
},
}
}
}
@ -102,7 +102,7 @@ fn update_cache(value: KeyValue) {
match KV_HOLDER.write() {
Ok(mut guard) => {
guard.cache.insert(value.key.clone(), value);
},
}
Err(e) => log::error!("Require write lock failed: {:?}", e),
};
}
@ -132,10 +132,10 @@ macro_rules! impl_set_func {
let mut item = KeyValue::new(key);
item.$set_method = Some(value);
match KV::set(item) {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
log::error!("{:?}", e)
},
}
};
}
}
@ -168,12 +168,12 @@ fn get_connection() -> Result<DBConnection, String> {
.get_connection()
.map_err(|e| format!("KVStore error: {:?}", e))?;
Ok(conn)
},
}
Err(e) => {
let msg = format!("KVStore get connection failed: {:?}", e);
log::error!("{:?}", msg);
Err(msg)
},
}
}
}

View File

@ -125,7 +125,7 @@ macro_rules! impl_sql_binary_expression {
e
);
panic!();
},
}
}
}
}

View File

@ -3,9 +3,7 @@ use crate::{
core::{
edit::ClientDocumentEditor,
revision::{DocumentRevisionCache, DocumentRevisionManager, RevisionServer},
DocumentWSReceivers,
DocumentWebSocket,
WSStateReceiver,
DocumentWSReceivers, DocumentWebSocket, WSStateReceiver,
},
errors::FlowyError,
server::Server,
@ -101,8 +99,8 @@ impl DocumentController {
match self.open_cache.get(doc_id) {
None => {
let db_pool = self.user.db_pool()?;
self.make_editor(&doc_id, db_pool).await
},
self.make_editor(doc_id, db_pool).await
}
Some(editor) => Ok(editor),
}
}
@ -123,7 +121,7 @@ impl DocumentController {
});
let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?;
self.ws_receivers.add(doc_id, doc_editor.ws_handler());
self.open_cache.insert(&doc_id, &doc_editor);
self.open_cache.insert(doc_id, &doc_editor);
Ok(doc_editor)
}
@ -162,7 +160,9 @@ pub struct OpenDocCache {
}
impl OpenDocCache {
fn new() -> Self { Self { inner: DashMap::new() } }
fn new() -> Self {
Self { inner: DashMap::new() }
}
pub(crate) fn insert(&self, doc_id: &str, doc: &Arc<ClientDocumentEditor>) {
if self.inner.contains_key(doc_id) {
@ -171,10 +171,12 @@ impl OpenDocCache {
self.inner.insert(doc_id.to_string(), doc.clone());
}
pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
pub(crate) fn contains(&self, doc_id: &str) -> bool {
self.inner.get(doc_id).is_some()
}
pub(crate) fn get(&self, doc_id: &str) -> Option<Arc<ClientDocumentEditor>> {
if !self.contains(&doc_id) {
if !self.contains(doc_id) {
return None;
}
let opened_doc = self.inner.get(doc_id).unwrap();

View File

@ -151,9 +151,13 @@ impl ClientDocumentEditor {
}
#[tracing::instrument(level = "debug", skip(self))]
pub fn stop(&self) { self.ws_manager.stop(); }
pub fn stop(&self) {
self.ws_manager.stop();
}
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.receiver() }
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> {
self.ws_manager.receiver()
}
}
fn spawn_edit_queue(
@ -185,5 +189,7 @@ impl ClientDocumentEditor {
Ok(delta)
}
pub fn rev_manager(&self) -> Arc<DocumentRevisionManager> { self.rev_manager.clone() }
pub fn rev_manager(&self) -> Arc<DocumentRevisionManager> {
self.rev_manager.clone()
}
}

View File

@ -51,7 +51,7 @@ impl EditorCommandQueue {
stream
.for_each(|command| async {
match self.handle_command(command).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
}
})
@ -68,7 +68,7 @@ impl EditorCommandQueue {
drop(document);
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::ComposeRemoteDelta {
revisions,
client_delta,
@ -96,7 +96,7 @@ impl EditorCommandQueue {
);
let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
let _ = ret.send(Ok(server_revision));
},
}
EditorCommand::OverrideDelta { revisions, delta, ret } => {
let mut document = self.document.write().await;
let _ = document.set_delta(delta);
@ -107,7 +107,7 @@ impl EditorCommandQueue {
assert_eq!(repeated_revision.last().unwrap().md5, md5);
let _ = self.rev_manager.reset_document(repeated_revision).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::TransformRevision { revisions, ret } => {
let f = || async {
let new_delta = make_delta_from_revisions(revisions)?;
@ -130,21 +130,21 @@ impl EditorCommandQueue {
})
};
let _ = ret.send(f().await);
},
}
EditorCommand::Insert { index, data, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.insert(index, data)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::Delete { interval, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.delete(interval)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::Format {
interval,
attribute,
@ -155,42 +155,42 @@ impl EditorCommandQueue {
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::Replace { interval, data, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.replace(interval, data)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::CanUndo { ret } => {
let _ = ret.send(self.document.read().await.can_undo());
},
}
EditorCommand::CanRedo { ret } => {
let _ = ret.send(self.document.read().await.can_redo());
},
}
EditorCommand::Undo { ret } => {
let mut write_guard = self.document.write().await;
let UndoResult { delta } = write_guard.undo()?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::Redo { ret } => {
let mut write_guard = self.document.write().await;
let UndoResult { delta } = write_guard.redo()?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
},
}
EditorCommand::ReadDoc { ret } => {
let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data));
},
}
EditorCommand::ReadDocDelta { ret } => {
let delta = self.document.read().await.delta().clone();
let _ = ret.send(Ok(delta));
},
}
}
Ok(())
}
@ -215,20 +215,20 @@ fn make_client_and_server_revision(
md5: DocumentMD5,
) -> (Revision, Option<Revision>) {
let client_revision = Revision::new(
&doc_id,
doc_id,
base_rev_id,
rev_id,
client_delta.to_bytes(),
&user_id,
user_id,
md5.clone(),
);
match server_delta {
None => (client_revision, None),
Some(server_delta) => {
let server_revision = Revision::new(&doc_id, base_rev_id, rev_id, server_delta.to_bytes(), &user_id, md5);
let server_revision = Revision::new(doc_id, base_rev_id, rev_id, server_delta.to_bytes(), user_id, md5);
(client_revision, Some(server_revision))
},
}
}
}

View File

@ -59,7 +59,9 @@ impl DocumentRevisionCache {
Ok(record)
}
pub async fn ack(&self, rev_id: i64) { self.memory_cache.ack(&rev_id).await; }
pub async fn ack(&self, rev_id: i64) {
self.memory_cache.ack(&rev_id).await;
}
pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
match self.memory_cache.get(&rev_id).await {
@ -69,11 +71,11 @@ impl DocumentRevisionCache {
assert_eq!(records.len(), 1);
}
records.pop()
},
}
Err(e) => {
tracing::error!("{}", e);
None
},
}
},
Some(revision) => Some(revision),
}
@ -141,7 +143,7 @@ impl RevisionMemoryCacheDelegate for Arc<SQLitePersistence> {
"checkpoint_result",
&format!("{} records were saved", records.len()).as_str(),
);
let _ = self.write_revision_records(records, &conn)?;
let _ = self.write_revision_records(records, conn)?;
}
Ok(())
}
@ -153,7 +155,7 @@ impl RevisionMemoryCacheDelegate for Arc<SQLitePersistence> {
state: RevisionTableState::Ack,
};
match self.update_revision_record(vec![changeset]) {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("{}", e),
}
}
@ -167,5 +169,7 @@ pub struct RevisionRecord {
}
impl RevisionRecord {
pub fn ack(&mut self) { self.state = RevisionState::Ack; }
pub fn ack(&mut self) {
self.state = RevisionState::Ack;
}
}

View File

@ -200,7 +200,9 @@ pub enum RevisionTableState {
}
impl std::default::Default for RevisionTableState {
fn default() -> Self { RevisionTableState::Local }
fn default() -> Self {
RevisionTableState::Local
}
}
impl std::convert::From<i32> for RevisionTableState {
@ -211,13 +213,15 @@ impl std::convert::From<i32> for RevisionTableState {
o => {
log::error!("Unsupported rev state {}, fallback to RevState::Local", o);
RevisionTableState::Local
},
}
}
}
}
impl RevisionTableState {
pub fn value(&self) -> i32 { *self as i32 }
pub fn value(&self) -> i32 {
*self as i32
}
}
impl_sql_integer_expression!(RevisionTableState);
@ -246,7 +250,7 @@ pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevisionTable)
table.base_rev_id,
table.rev_id,
Bytes::from(table.data),
&user_id,
user_id,
md5,
);
RevisionRecord {
@ -265,7 +269,9 @@ pub enum RevTableType {
}
impl std::default::Default for RevTableType {
fn default() -> Self { RevTableType::Local }
fn default() -> Self {
RevTableType::Local
}
}
impl std::convert::From<i32> for RevTableType {
@ -276,12 +282,14 @@ impl std::convert::From<i32> for RevTableType {
o => {
log::error!("Unsupported rev type {}, fallback to RevTableType::Local", o);
RevTableType::Local
},
}
}
}
}
impl RevTableType {
pub fn value(&self) -> i32 { *self as i32 }
pub fn value(&self) -> i32 {
*self as i32
}
}
impl_sql_integer_expression!(RevTableType);

View File

@ -97,9 +97,13 @@ impl DocumentRevisionManager {
Ok(())
}
pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
pub fn rev_id(&self) -> i64 {
self.rev_id_counter.value()
}
pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub fn set_rev_id(&self, rev_id: i64) {
self.rev_id_counter.set(rev_id);
}
pub fn next_rev_id_pair(&self) -> (i64, i64) {
let cur = self.rev_id_counter.value();
@ -127,7 +131,9 @@ impl DocumentRevisionManager {
})
}
pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
pub async fn latest_revision(&self) -> Revision {
self.cache.latest_revision().await
}
pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
self.cache.get(rev_id).await.map(|record| record.revision)
@ -150,7 +156,9 @@ impl std::default::Default for RevisionSyncSequence {
}
impl RevisionSyncSequence {
fn new() -> Self { RevisionSyncSequence::default() }
fn new() -> Self {
RevisionSyncSequence::default()
}
async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
// The last revision's rev_id must be greater than the new one.
@ -189,7 +197,9 @@ impl RevisionSyncSequence {
}
}
async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
async fn next_sync_rev_id(&self) -> Option<i64> {
self.local_revs.read().await.front().copied()
}
}
struct RevisionLoader {
@ -223,7 +233,7 @@ impl RevisionLoader {
.filter(|record| future::ready(record.state == RevisionState::Local))
.for_each(|record| async move {
match self.cache.add(record.revision, record.state, false).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("{}", e),
}
})
@ -268,12 +278,18 @@ fn correct_delta(delta: &mut RichTextDelta) {
#[cfg(feature = "flowy_unit_test")]
impl RevisionSyncSequence {
#[allow(dead_code)]
pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> {
self.revs_map.clone()
}
#[allow(dead_code)]
pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> {
self.local_revs.clone()
}
}
#[cfg(feature = "flowy_unit_test")]
impl DocumentRevisionManager {
pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> { self.cache.clone() }
pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> {
self.cache.clone()
}
}

View File

@ -29,7 +29,9 @@ impl DocumentRevisionMemoryCache {
}
}
pub(crate) fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
pub(crate) fn contains(&self, rev_id: &i64) -> bool {
self.revs_map.contains_key(rev_id)
}
pub(crate) async fn add<'a>(&'a self, record: Cow<'a, RevisionRecord>) {
let record = match record {
@ -55,7 +57,7 @@ impl DocumentRevisionMemoryCache {
pub(crate) async fn ack(&self, rev_id: &i64) {
match self.revs_map.get_mut(rev_id) {
None => {},
None => {}
Some(mut record) => record.ack(),
}
@ -69,7 +71,7 @@ impl DocumentRevisionMemoryCache {
}
pub(crate) async fn get(&self, rev_id: &i64) -> Option<RevisionRecord> {
self.revs_map.get(&rev_id).map(|r| r.value().clone())
self.revs_map.get(rev_id).map(|r| r.value().clone())
}
pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result<Vec<RevisionRecord>, FlowyError> {
@ -123,10 +125,10 @@ impl DocumentRevisionMemoryCache {
// https://stackoverflow.com/questions/28952411/what-is-the-idiomatic-way-to-pop-the-last-n-elements-in-a-mutable-vec
let mut save_records: Vec<RevisionRecord> = vec![];
revs_write_guard.iter().for_each(|rev_id| match rev_map.get(rev_id) {
None => {},
None => {}
Some(value) => {
save_records.push(value.value().clone());
},
}
});
if delegate.checkpoint_tick(save_records).is_ok() {

View File

@ -15,8 +15,7 @@ use lib_ws::WSConnectState;
use std::{convert::TryFrom, sync::Arc};
use tokio::{
sync::{
broadcast,
mpsc,
broadcast, mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
},
task::spawn_blocking,
@ -77,7 +76,9 @@ impl HttpWebSocketManager {
tokio::spawn(stream.run());
}
pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> {
self.state.subscribe()
}
}
impl DocumentWebSocketManager for Arc<HttpWebSocketManager> {
@ -87,27 +88,31 @@ impl DocumentWebSocketManager for Arc<HttpWebSocketManager> {
}
}
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> { self.clone() }
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> {
self.clone()
}
}
impl DocumentWSReceiver for HttpWebSocketManager {
fn receive_ws_data(&self, doc_data: DocumentServerWSData) {
match self.ws_msg_tx.send(doc_data) {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e),
}
}
fn connect_state_changed(&self, state: &WSConnectState) {
match self.state.send(state.clone()) {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("{}", e),
}
}
}
impl std::ops::Drop for HttpWebSocketManager {
fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) }
fn drop(&mut self) {
tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id)
}
}
pub trait DocumentWSSteamConsumer: Send + Sync {
@ -168,7 +173,7 @@ impl DocumentWSStream {
stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e),
}
})
@ -185,20 +190,20 @@ impl DocumentWSStream {
match ty {
DocumentServerWSDataType::ServerPushRev => {
let _ = self.consumer.receive_push_revision(bytes).await?;
},
}
DocumentServerWSDataType::ServerPullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = self.consumer.pull_revisions_in_range(range).await?;
},
}
DocumentServerWSDataType::ServerAck => {
let rev_id = RevId::try_from(bytes).unwrap().value;
let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await;
},
}
DocumentServerWSDataType::UserConnect => {
let new_user = NewDocumentUser::try_from(bytes)?;
let _ = self.consumer.receive_new_user_connect(new_user).await;
// Notify the user that someone has connected to this document
},
}
}
Ok(())
}
@ -258,7 +263,7 @@ impl DocumentWSSink {
stream
.for_each(|_| async {
match self.send_next_revision().await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("[DocumentSink]: Send failed, {:?}", e),
}
})
@ -270,12 +275,12 @@ impl DocumentWSSink {
None => {
tracing::trace!("Finish synchronizing revisions");
Ok(())
},
}
Some(data) => {
tracing::trace!("[DocumentSink]: send: {}:{}-{:?}", data.doc_id, data.id(), data.ty);
self.ws_sender.send(data)
// let _ = tokio::time::timeout(Duration::from_millis(2000),
},
}
}
}
}

View File

@ -8,7 +8,9 @@ pub(crate) struct LocalWebSocketManager {}
impl DocumentWebSocketManager for Arc<LocalWebSocketManager> {
fn stop(&self) {}
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> { self.clone() }
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> {
self.clone()
}
}
impl DocumentWSReceiver for LocalWebSocketManager {

View File

@ -1,10 +1,6 @@
use crate::core::{
web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager},
DocumentRevisionManager,
DocumentWSReceiver,
DocumentWebSocket,
EditorCommand,
TransformDeltas,
DocumentRevisionManager, DocumentWSReceiver, DocumentWebSocket, EditorCommand, TransformDeltas,
};
use bytes::Bytes;
use flowy_collaboration::{
@ -84,10 +80,10 @@ fn listen_document_ws_state(
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
match state {
WSConnectState::Init => {},
WSConnectState::Connecting => {},
WSConnectState::Connected => {},
WSConnectState::Disconnected => {},
WSConnectState::Init => {}
WSConnectState::Connecting => {}
WSConnectState::Connected => {}
WSConnectState::Disconnected => {}
}
}
});
@ -196,7 +192,7 @@ pub(crate) async fn handle_remote_revision(
});
let _ = rx.await.map_err(internal_error)??;
Ok(None)
},
}
Some(server_prime) => {
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta {
@ -206,7 +202,7 @@ pub(crate) async fn handle_remote_revision(
ret,
});
Ok(rx.await.map_err(internal_error)??)
},
}
}
}
@ -233,9 +229,13 @@ impl SharedWSSinkDataProvider {
}
#[allow(dead_code)]
pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); }
pub(crate) async fn push_front(&self, data: DocumentClientWSData) {
self.shared.write().await.push_front(data);
}
async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); }
async fn push_back(&self, data: DocumentClientWSData) {
self.shared.write().await.push_back(data);
}
async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> {
let source_ty = self.source_ty.read().await.clone();
@ -244,11 +244,11 @@ impl SharedWSSinkDataProvider {
None => {
*self.source_ty.write().await = SourceType::Revision;
Ok(None)
},
}
Some(data) => {
tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
Ok(Some(data.clone()))
},
}
},
SourceType::Revision => {
if !self.shared.read().await.is_empty() {
@ -260,15 +260,15 @@ impl SharedWSSinkDataProvider {
Some(rev) => {
let doc_id = rev.doc_id.clone();
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
},
}
None => {
//
let doc_id = self.rev_manager.doc_id.clone();
let latest_rev_id = self.rev_manager.rev_id();
Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id)))
},
}
},
}
}
}
}
@ -287,22 +287,22 @@ impl SharedWSSinkDataProvider {
tracing::error!("The front element's {} is not equal to the {}", expected_id, id);
false
}
},
}
};
if should_pop {
let _ = self.shared.write().await.pop_front();
}
},
}
SourceType::Revision => {
match id.parse::<i64>() {
Ok(rev_id) => {
let _ = self.rev_manager.ack_revision(rev_id).await?;
},
}
Err(e) => {
tracing::error!("Parse rev_id from {} failed. {}", id, e);
},
}
};
},
}
}
Ok(())

View File

@ -7,7 +7,9 @@ pub(crate) enum DocObservable {
}
impl std::convert::From<DocObservable> for i32 {
fn from(o: DocObservable) -> Self { o as i32 }
fn from(o: DocObservable) -> Self {
o as i32
}
}
#[allow(dead_code)]

View File

@ -14,14 +14,14 @@ impl ResponseMiddleware for DocMiddleware {
log::error!("document user is unauthorized");
match token {
None => {},
None => {}
Some(_token) => {
// let error =
// FlowyError::new(ErrorCode::UserUnauthorized, "");
// observable(token,
// WorkspaceObservable::UserUnauthorized).error(error).
// build()
},
}
}
}
}

View File

@ -8,7 +8,9 @@ pub struct DocServer {
}
impl DocServer {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
pub fn new(config: ClientServerConfiguration) -> Self {
Self { config }
}
}
impl DocumentServerAPI for DocServer {

View File

@ -29,7 +29,9 @@ impl std::default::Default for DocumentWSReceivers {
}
impl DocumentWSReceivers {
pub fn new() -> Self { DocumentWSReceivers::default() }
pub fn new() -> Self {
DocumentWSReceivers::default()
}
pub(crate) fn add(&self, doc_id: &str, receiver: Arc<dyn DocumentWSReceiver>) {
if self.receivers.contains_key(doc_id) {
@ -38,23 +40,25 @@ impl DocumentWSReceivers {
self.receivers.insert(doc_id.to_string(), receiver);
}
pub(crate) fn remove(&self, id: &str) { self.receivers.remove(id); }
pub(crate) fn remove(&self, id: &str) {
self.receivers.remove(id);
}
pub fn did_receive_data(&self, data: Bytes) {
let data: DocumentServerWSData = data.try_into().unwrap();
match self.receivers.get(&data.doc_id) {
None => {
log::error!("Can't find any source handler for {:?}", data.doc_id);
},
}
Some(handler) => {
handler.receive_ws_data(data);
},
}
}
}
pub fn ws_connect_state_changed(&self, state: &WSConnectState) {
self.receivers.iter().for_each(|receiver| {
receiver.value().connect_state_changed(&state);
receiver.value().connect_state_changed(state);
});
}
}

View File

@ -46,20 +46,20 @@ impl EditorTest {
match script {
EditorScript::InsertText(s, offset) => {
self.editor.insert(offset, s).await.unwrap();
},
}
EditorScript::Delete(interval) => {
self.editor.delete(interval).await.unwrap();
},
}
EditorScript::Replace(interval, s) => {
self.editor.replace(interval, s).await.unwrap();
},
}
EditorScript::AssertRevisionState(rev_id, state) => {
let record = cache.get(rev_id).await.unwrap();
assert_eq!(record.state, state);
},
}
EditorScript::AssertCurrentRevId(rev_id) => {
assert_eq!(self.editor.rev_manager().rev_id(), rev_id);
},
}
EditorScript::AssertNextRevId(rev_id) => {
let next_revision = rev_manager.next_sync_revision().await.unwrap();
if rev_id.is_none() {
@ -68,7 +68,7 @@ impl EditorTest {
}
let next_revision = next_revision.unwrap();
assert_eq!(next_revision.rev_id, rev_id.unwrap());
},
}
EditorScript::AssertJson(expected) => {
let expected_delta: RichTextDelta = serde_json::from_str(expected).unwrap();
let delta = self.editor.doc_delta().await.unwrap();
@ -77,7 +77,7 @@ impl EditorTest {
eprintln!("❌ receive: {}", delta.to_json());
}
assert_eq!(expected_delta, delta);
},
}
}
sleep(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)).await;
}

View File

@ -112,31 +112,31 @@ impl TestBuilder {
tracing::debug!("Insert delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Delete(delta_i, iv) => {
let document = &mut self.documents[*delta_i];
let delta = document.replace(*iv, "").unwrap();
tracing::trace!("Delete delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Replace(delta_i, iv, s) => {
let document = &mut self.documents[*delta_i];
let delta = document.replace(*iv, s).unwrap();
tracing::trace!("Replace delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::InsertBold(delta_i, s, iv) => {
let document = &mut self.documents[*delta_i];
document.insert(iv.start, s).unwrap();
document.format(*iv, RichTextAttribute::Bold(true)).unwrap();
},
}
TestOp::Bold(delta_i, iv, enable) => {
let document = &mut self.documents[*delta_i];
let attribute = RichTextAttribute::Bold(*enable);
let delta = document.format(*iv, attribute).unwrap();
tracing::trace!("Bold delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Italic(delta_i, iv, enable) => {
let document = &mut self.documents[*delta_i];
let attribute = match *enable {
@ -146,21 +146,21 @@ impl TestBuilder {
let delta = document.format(*iv, attribute).unwrap();
tracing::trace!("Italic delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Header(delta_i, iv, level) => {
let document = &mut self.documents[*delta_i];
let attribute = RichTextAttribute::Header(*level);
let delta = document.format(*iv, attribute).unwrap();
tracing::trace!("Header delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Link(delta_i, iv, link) => {
let document = &mut self.documents[*delta_i];
let attribute = RichTextAttribute::Link(link.to_owned());
let delta = document.format(*iv, attribute).unwrap();
tracing::trace!("Link delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Bullet(delta_i, iv, enable) => {
let document = &mut self.documents[*delta_i];
let attribute = RichTextAttribute::Bullet(*enable);
@ -168,7 +168,7 @@ impl TestBuilder {
tracing::debug!("Bullet delta: {}", delta.to_json());
self.deltas.insert(*delta_i, Some(delta));
},
}
TestOp::Transform(delta_a_i, delta_b_i) => {
let (a_prime, b_prime) = self.documents[*delta_a_i]
.delta()
@ -181,7 +181,7 @@ impl TestBuilder {
self.documents[*delta_a_i].set_delta(data_left);
self.documents[*delta_b_i].set_delta(data_right);
},
}
TestOp::TransformPrime(a_doc_index, b_doc_index) => {
let (prime_left, prime_right) = self.documents[*a_doc_index]
.delta()
@ -190,7 +190,7 @@ impl TestBuilder {
self.primes.insert(*a_doc_index, Some(prime_left));
self.primes.insert(*b_doc_index, Some(prime_right));
},
}
TestOp::Invert(delta_a_i, delta_b_i) => {
let delta_a = &self.documents[*delta_a_i].delta();
let delta_b = &self.documents[*delta_b_i].delta();
@ -212,19 +212,19 @@ impl TestBuilder {
assert_eq!(delta_a, &&new_delta_after_undo);
self.documents[*delta_a_i].set_delta(new_delta_after_undo);
},
}
TestOp::Undo(delta_i) => {
self.documents[*delta_i].undo().unwrap();
},
}
TestOp::Redo(delta_i) => {
self.documents[*delta_i].redo().unwrap();
},
}
TestOp::Wait(mills_sec) => {
std::thread::sleep(Duration::from_millis(*mills_sec as u64));
},
}
TestOp::AssertStr(delta_i, expected) => {
assert_eq!(&self.documents[*delta_i].to_plain_string(), expected);
},
}
TestOp::AssertDocJson(delta_i, expected) => {
let delta_json = self.documents[*delta_i].to_json();
@ -236,7 +236,7 @@ impl TestBuilder {
log::error!("❌ receive: {}", delta_json);
}
assert_eq!(target_delta, expected_delta);
},
}
TestOp::AssertPrimeJson(doc_i, expected) => {
let prime_json = self.primes[*doc_i].as_ref().unwrap().to_json();
@ -248,11 +248,11 @@ impl TestBuilder {
log::error!("❌ receive prime: {}", prime_json);
}
assert_eq!(target_prime, expected_prime);
},
}
TestOp::DocComposeDelta(doc_index, delta_i) => {
let delta = self.deltas.get(*delta_i).unwrap().as_ref().unwrap();
self.documents[*doc_index].compose_delta(delta.clone()).unwrap();
},
}
TestOp::DocComposePrime(doc_index, prime_i) => {
let delta = self
.primes
@ -262,7 +262,7 @@ impl TestBuilder {
.unwrap();
let new_delta = self.documents[*doc_index].delta().compose(delta).unwrap();
self.documents[*doc_index].set_delta(new_delta);
},
}
}
}
@ -279,12 +279,16 @@ impl TestBuilder {
pub struct Rng(StdRng);
impl Default for Rng {
fn default() -> Self { Rng(StdRng::from_rng(thread_rng()).unwrap()) }
fn default() -> Self {
Rng(StdRng::from_rng(thread_rng()).unwrap())
}
}
impl Rng {
#[allow(dead_code)]
pub fn from_seed(seed: [u8; 32]) -> Self { Rng(StdRng::from_seed(seed)) }
pub fn from_seed(seed: [u8; 32]) -> Self {
Rng(StdRng::from_seed(seed))
}
pub fn gen_string(&mut self, len: usize) -> String {
(0..len)
@ -311,13 +315,13 @@ impl Rng {
match self.0.gen_range(0.0, 1.0) {
f if f < 0.2 => {
delta.insert(&self.gen_string(i), RichTextAttributes::default());
},
}
f if f < 0.4 => {
delta.delete(i);
},
}
_ => {
delta.retain(i, RichTextAttributes::default());
},
}
}
}
if self.0.gen_range(0.0, 1.0) < 0.3 {

View File

@ -18,7 +18,9 @@ pub struct FlowyError {
macro_rules! static_flowy_error {
($name:ident, $code:expr) => {
#[allow(non_snake_case, missing_docs)]
pub fn $name() -> FlowyError { $code.into() }
pub fn $name() -> FlowyError {
$code.into()
}
};
}
@ -81,7 +83,9 @@ where
}
impl fmt::Display for FlowyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}: {}", &self.code, &self.msg) }
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}: {}", &self.code, &self.msg)
}
}
impl lib_dispatch::Error for FlowyError {
@ -92,9 +96,13 @@ impl lib_dispatch::Error for FlowyError {
}
impl std::convert::From<std::io::Error> for FlowyError {
fn from(error: std::io::Error) -> Self { FlowyError::internal().context(error) }
fn from(error: std::io::Error) -> Self {
FlowyError::internal().context(error)
}
}
impl std::convert::From<protobuf::ProtobufError> for FlowyError {
fn from(e: protobuf::ProtobufError) -> Self { FlowyError::internal().context(e) }
fn from(e: protobuf::ProtobufError) -> Self {
FlowyError::internal().context(e)
}
}

View File

@ -16,7 +16,7 @@ fn server_error_to_flowy_error(code: ServerErrorCode) -> ErrorCode {
ServerErrorCode::RecordNotFound => ErrorCode::RecordNotFound,
ServerErrorCode::ConnectRefused | ServerErrorCode::ConnectTimeout | ServerErrorCode::ConnectClose => {
ErrorCode::ConnectError
},
}
_ => ErrorCode::Internal,
}
}

View File

@ -1,5 +1,7 @@
use crate::FlowyError;
impl std::convert::From<flowy_collaboration::errors::CollaborateError> for FlowyError {
fn from(error: flowy_collaboration::errors::CollaborateError) -> Self { FlowyError::internal().context(error) }
fn from(error: flowy_collaboration::errors::CollaborateError) -> Self {
FlowyError::internal().context(error)
}
}

View File

@ -1,15 +1,21 @@
use crate::FlowyError;
impl std::convert::From<flowy_database::Error> for FlowyError {
fn from(error: flowy_database::Error) -> Self { FlowyError::internal().context(error) }
fn from(error: flowy_database::Error) -> Self {
FlowyError::internal().context(error)
}
}
impl std::convert::From<::r2d2::Error> for FlowyError {
fn from(error: r2d2::Error) -> Self { FlowyError::internal().context(error) }
fn from(error: r2d2::Error) -> Self {
FlowyError::internal().context(error)
}
}
// use diesel::result::{Error, DatabaseErrorKind};
// use lib_sqlite::ErrorKind;
impl std::convert::From<lib_sqlite::Error> for FlowyError {
fn from(error: lib_sqlite::Error) -> Self { FlowyError::internal().context(error) }
fn from(error: lib_sqlite::Error) -> Self {
FlowyError::internal().context(error)
}
}

View File

@ -1,5 +1,7 @@
use crate::FlowyError;
impl std::convert::From<lib_ot::errors::OTError> for FlowyError {
fn from(error: lib_ot::errors::OTError) -> Self { FlowyError::internal().context(error) }
fn from(error: lib_ot::errors::OTError) -> Self {
FlowyError::internal().context(error)
}
}

View File

@ -1,5 +1,7 @@
use crate::FlowyError;
impl std::convert::From<serde_json::Error> for FlowyError {
fn from(error: serde_json::Error) -> Self { FlowyError::internal().context(error) }
fn from(error: serde_json::Error) -> Self {
FlowyError::internal().context(error)
}
}

View File

@ -20,7 +20,9 @@ impl NetworkType {
}
impl std::default::Default for NetworkType {
fn default() -> Self { NetworkType::UnknownNetworkType }
fn default() -> Self {
NetworkType::UnknownNetworkType
}
}
#[derive(ProtoBuf, Debug, Default, Clone)]

View File

@ -9,7 +9,9 @@ use std::sync::Arc;
use tokio::sync::broadcast::Receiver;
impl FlowyRawWebSocket for Arc<WSController> {
fn initialize(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn initialize(&self) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn start_connect(&self, addr: String, _user_id: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
@ -27,7 +29,9 @@ impl FlowyRawWebSocket for Arc<WSController> {
})
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> {
self.subscribe_state()
}
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();

View File

@ -7,9 +7,7 @@ use flowy_collaboration::{
},
errors::CollaborateError,
protobuf::{
DocumentClientWSData as DocumentClientWSDataPB,
RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB,
DocumentClientWSData as DocumentClientWSDataPB, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB,
},
sync::*,
util::repeated_revision_from_repeated_revision_pb,
@ -58,10 +56,10 @@ impl LocalDocumentServer {
.doc_manager
.handle_client_revisions(user, document_client_data)
.await?;
},
}
DocumentClientWSDataType::ClientPing => {
let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
},
}
}
Ok(())
}
@ -72,7 +70,9 @@ struct LocalDocServerPersistence {
}
impl Debug for LocalDocServerPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") }
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("LocalDocServerPersistence")
}
}
impl std::default::Default for LocalDocServerPersistence {
@ -93,7 +93,7 @@ impl DocumentPersistence for LocalDocServerPersistence {
Some(val) => {
//
Ok(val.value().clone())
},
}
}
})
}
@ -130,15 +130,17 @@ struct LocalDocumentUser {
}
impl RevisionUser for LocalDocumentUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn user_id(&self) -> String {
self.user_id.clone()
}
fn receive(&self, resp: SyncResponse) {
let sender = self.ws_sender.clone();
let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
tracing::error!("LocalDocumentUser send message failed: {}", e);
},
}
};
tokio::spawn(async move {
@ -150,7 +152,7 @@ impl RevisionUser for LocalDocumentUser {
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
}
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
@ -158,7 +160,7 @@ impl RevisionUser for LocalDocumentUser {
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
}
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
@ -166,10 +168,10 @@ impl RevisionUser for LocalDocumentUser {
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
}
SyncResponse::NewRevision(mut _repeated_revision) => {
// unimplemented!()
},
}
}
});
}

View File

@ -70,10 +70,10 @@ impl LocalWebSocket {
Ok::<(), FlowyError>(())
};
match fut().await {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("[LocalWebSocket] error: {:?}", e),
}
},
}
Err(e) => tracing::error!("[LocalWebSocket] error: {}", e),
}
}
@ -102,18 +102,26 @@ impl FlowyRawWebSocket for LocalWebSocket {
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> {
self.state_sender.subscribe()
}
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
self.receivers.insert(receiver.source(), receiver);
Ok(())
}
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
Ok(Arc::new(self.ws_sender.clone()))
}
}
#[derive(Clone)]
@ -136,5 +144,7 @@ impl FlowyWSSender for LocalWSSender {
impl std::ops::Deref for LocalWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -42,7 +42,7 @@ impl FlowyWebSocketConnect {
pub async fn init(&self) {
match self.inner.initialize().await {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("FlowyWebSocketConnect init error: {:?}", e),
}
}
@ -54,7 +54,9 @@ impl FlowyWebSocketConnect {
Ok(())
}
pub async fn stop(&self) { let _ = self.inner.stop_connect().await; }
pub async fn stop(&self) {
let _ = self.inner.stop_connect().await;
}
pub fn update_network_type(&self, new_type: &NetworkType) {
tracing::debug!("Network new state: {:?}", new_type);
@ -67,11 +69,11 @@ impl FlowyWebSocketConnect {
(false, true) => {
let ws_controller = self.inner.clone();
tokio::spawn(async move { retry_connect(ws_controller, 100).await });
},
}
(true, false) => {
//
},
_ => {},
}
_ => {}
}
*self.connect_type.write() = new_type.clone();
@ -82,14 +84,18 @@ impl FlowyWebSocketConnect {
self.inner.subscribe_connect_state()
}
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> {
self.status_notifier.subscribe()
}
pub fn add_ws_message_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.inner.add_receiver(receiver)?;
Ok(())
}
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { self.inner.sender() }
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
self.inner.sender()
}
}
#[tracing::instrument(level = "debug", skip(ws_conn))]
@ -103,16 +109,16 @@ pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
Ok(state) => {
tracing::info!("Websocket state changed: {}", state);
match state {
WSConnectState::Init => {},
WSConnectState::Connected => {},
WSConnectState::Connecting => {},
WSConnectState::Init => {}
WSConnectState::Connected => {}
WSConnectState::Connecting => {}
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
}
},
}
Err(e) => {
tracing::error!("Websocket state notify error: {:?}", e);
break;
},
}
}
}
});
@ -123,9 +129,9 @@ pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
async fn retry_connect(ws: Arc<dyn FlowyRawWebSocket>, count: usize) {
match ws.reconnect(count).await {
Ok(_) => {},
Ok(_) => {}
Err(e) => {
tracing::error!("websocket connect failed: {:?}", e);
},
}
}
}

View File

@ -53,11 +53,17 @@ impl DocumentUser for DocumentUserImpl {
Ok(doc_dir)
}
fn user_id(&self) -> Result<String, FlowyError> { self.user.user_id() }
fn user_id(&self) -> Result<String, FlowyError> {
self.user.user_id()
}
fn token(&self) -> Result<String, FlowyError> { self.user.token() }
fn token(&self) -> Result<String, FlowyError> {
self.user.token()
}
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> { self.user.db_pool() }
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
self.user.db_pool()
}
}
struct DocumentWebSocketAdapter {
@ -76,12 +82,18 @@ impl DocumentWebSocket for DocumentWebSocketAdapter {
Ok(())
}
fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_conn.subscribe_websocket_state() }
fn subscribe_state_changed(&self) -> WSStateReceiver {
self.ws_conn.subscribe_websocket_state()
}
}
struct WSMessageReceiverAdaptor(Arc<DocumentWSReceivers>);
impl WSMessageReceiver for WSMessageReceiverAdaptor {
fn source(&self) -> WSModule { WSModule::Doc }
fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
fn source(&self) -> WSModule {
WSModule::Doc
}
fn receive_message(&self, msg: WebSocketRawMessage) {
self.0.did_receive_data(Bytes::from(msg.data));
}
}

View File

@ -122,7 +122,9 @@ impl FlowySDK {
}
}
pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
pub fn dispatcher(&self) -> Arc<EventDispatcher> {
self.dispatcher.clone()
}
}
fn _init(
@ -161,26 +163,26 @@ async fn _listen_user_status(
UserStatus::Login { token, user_id } => {
let _ = core.user_did_sign_in(&token).await?;
let _ = ws_conn.start(token, user_id).await?;
},
}
UserStatus::Logout { .. } => {
core.user_did_logout().await;
let _ = ws_conn.stop().await;
},
}
UserStatus::Expired { .. } => {
core.user_session_expired().await;
let _ = ws_conn.stop().await;
},
}
UserStatus::SignUp { profile, ret } => {
let _ = core.user_did_sign_up(&profile.token).await?;
let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
let _ = ret.send(());
},
}
}
Ok::<(), FlowyError>(())
};
match result().await {
Ok(_) => {},
Ok(_) => {}
Err(e) => log::error!("{}", e),
}
}
@ -194,7 +196,7 @@ async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>,
fn init_kv(root: &str) {
match flowy_database::kv::KV::init(root) {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::error!("Init kv store failedL: {}", e),
}
}

View File

@ -15,8 +15,14 @@ pub fn mk_modules(
vec![user_module, core_module, network_module]
}
fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module::create(user_session) }
fn mk_user_module(user_session: Arc<UserSession>) -> Module {
flowy_user::module::create(user_session)
}
fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) }
fn mk_core_module(core: Arc<CoreContext>) -> Module {
flowy_core::module::create(core)
}
fn mk_network_module(ws_conn: Arc<FlowyWebSocketConnect>) -> Module { flowy_net::module::create(ws_conn) }
fn mk_network_module(ws_conn: Arc<FlowyWebSocketConnect>) -> Module {
flowy_net::module::create(ws_conn)
}

View File

@ -11,8 +11,12 @@ use std::{
pub type CoreModuleEventBuilder = EventBuilder<FlowyError>;
impl CoreModuleEventBuilder {
pub fn new(sdk: FlowySDKTest) -> Self { EventBuilder::test(TestContext::new(sdk)) }
pub fn user_profile(&self) -> &Option<UserProfile> { &self.user_profile }
pub fn new(sdk: FlowySDKTest) -> Self {
EventBuilder::test(TestContext::new(sdk))
}
pub fn user_profile(&self) -> &Option<UserProfile> {
&self.user_profile
}
}
pub type UserModuleEventBuilder = CoreModuleEventBuilder;
@ -44,10 +48,10 @@ where
Ok(bytes) => {
let module_request = self.get_request();
self.context.request = Some(module_request.payload(bytes))
},
}
Err(e) => {
log::error!("Set payload failed: {:?}", e);
},
}
}
self
}
@ -83,7 +87,7 @@ where
Ok(Ok(data)) => data,
Ok(Err(e)) => {
panic!("parse failed: {:?}", e)
},
}
Err(e) => panic!("Internal error: {:?}", e),
}
}
@ -104,7 +108,9 @@ where
self
}
fn dispatch(&self) -> Arc<EventDispatcher> { self.context.sdk.dispatcher() }
fn dispatch(&self) -> Arc<EventDispatcher> {
self.context.sdk.dispatcher()
}
fn get_response(&self) -> EventResponse {
self.context
@ -114,7 +120,9 @@ where
.clone()
}
fn get_request(&mut self) -> ModuleRequest { self.context.request.take().expect("must call event first") }
fn get_request(&mut self) -> ModuleRequest {
self.context.request.take().expect("must call event first")
}
}
#[derive(Clone)]

View File

@ -72,10 +72,10 @@ pub struct ViewTest {
impl ViewTest {
pub async fn new(sdk: &FlowySDKTest) -> Self {
let workspace = create_workspace(&sdk, "Workspace", "").await;
open_workspace(&sdk, &workspace.id).await;
let app = create_app(&sdk, "App", "AppFlowy GitHub Project", &workspace.id).await;
let view = create_view(&sdk, &app.id).await;
let workspace = create_workspace(sdk, "Workspace", "").await;
open_workspace(sdk, &workspace.id).await;
let app = create_app(sdk, "App", "AppFlowy GitHub Project", &workspace.id).await;
let view = create_view(sdk, &app.id).await;
Self {
sdk: sdk.clone(),
workspace,
@ -291,11 +291,17 @@ pub fn root_dir() -> String {
root_dir
}
pub fn random_email() -> String { format!("{}@appflowy.io", uuid_string()) }
pub fn random_email() -> String {
format!("{}@appflowy.io", uuid_string())
}
pub fn login_email() -> String { "annie2@appflowy.io".to_string() }
pub fn login_email() -> String {
"annie2@appflowy.io".to_string()
}
pub fn login_password() -> String { "HelloWorld!123".to_string() }
pub fn login_password() -> String {
"HelloWorld!123".to_string()
}
pub struct SignUpContext {
pub user_profile: UserProfile,
@ -365,4 +371,6 @@ fn sign_in(dispatch: Arc<EventDispatcher>) -> UserProfile {
}
#[allow(dead_code)]
fn logout(dispatch: Arc<EventDispatcher>) { let _ = EventDispatcher::sync_send(dispatch, ModuleRequest::new(SignOut)); }
fn logout(dispatch: Arc<EventDispatcher>) {
let _ = EventDispatcher::sync_send(dispatch, ModuleRequest::new(SignOut));
}

View File

@ -20,7 +20,9 @@ pub struct FlowySDKTest {
impl std::ops::Deref for FlowySDKTest {
type Target = FlowySDK;
fn deref(&self) -> &Self::Target { &self.inner }
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl std::default::Default for FlowySDKTest {

View File

@ -12,11 +12,15 @@ pub(crate) enum UserNotification {
}
impl std::default::Default for UserNotification {
fn default() -> Self { UserNotification::Unknown }
fn default() -> Self {
UserNotification::Unknown
}
}
impl std::convert::From<UserNotification> for i32 {
fn from(notification: UserNotification) -> Self { notification as i32 }
fn from(notification: UserNotification) -> Self {
notification as i32
}
}
pub(crate) fn dart_notify(id: &str, ty: UserNotification) -> DartNotifyBuilder {

View File

@ -10,7 +10,9 @@ pub struct UserHttpServer {
config: ClientServerConfiguration,
}
impl UserHttpServer {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
pub fn new(config: ClientServerConfiguration) -> Self {
Self { config }
}
}
impl UserServerAPI for UserHttpServer {
@ -57,7 +59,9 @@ impl UserServerAPI for UserHttpServer {
})
}
fn ws_addr(&self) -> String { self.config.ws_addr() }
fn ws_addr(&self) -> String {
self.config.ws_addr()
}
}
// use crate::notify::*;

View File

@ -35,7 +35,9 @@ impl UserServerAPI for UserServerMock {
})
}
fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
@ -45,5 +47,7 @@ impl UserServerAPI for UserServerMock {
FutureResult::new(async { Ok(UserProfile::default()) })
}
fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() }
fn ws_addr(&self) -> String {
"ws://localhost:8000/ws/".to_owned()
}
}

View File

@ -37,7 +37,7 @@ impl UserDB {
Some(mut write_guard) => {
write_guard.insert(user_id.to_owned(), db);
Ok(())
},
}
}
}
@ -48,7 +48,7 @@ impl UserDB {
set_user_db_init(false, user_id);
write_guard.remove(user_id);
Ok(())
},
}
}
}
@ -67,7 +67,7 @@ impl UserDB {
let _ = self.open_user_db(user_id)?;
set_user_db_init(true, user_id);
}
},
}
}
match DB_MAP.try_read_for(Duration::from_millis(300)) {
@ -75,7 +75,7 @@ impl UserDB {
Some(read_guard) => match read_guard.get(user_id) {
None => {
Err(FlowyError::internal().context("Get connection failed. The database is not initialization"))
},
}
Some(database) => Ok(database.get_pool()),
},
}

Some files were not shown because too many files have changed in this diff Show More