chore: checking workspace state consistent after switching workspace (#5201)

* refactor: getting workspace id

* refactor: check workspace id is match for http response

* refactor: check http repsonse in valid by checing the workspace id

* chore: update log

* chore: fix test

* chore: fix test

* chore: add test

* chore: update test
This commit is contained in:
Nathan.fooo
2024-04-26 09:44:07 +08:00
committed by GitHub
parent 65a289648e
commit cc66147bc0
51 changed files with 980 additions and 575 deletions

View File

@ -1,4 +1,12 @@
use flowy_error::FlowyResult;
pub const USER_SIGN_IN_URL: &str = "sign_in_url";
pub const USER_UUID: &str = "uuid";
pub const USER_EMAIL: &str = "email";
pub const USER_DEVICE_ID: &str = "device_id";
/// Represents a user that is currently using the server.
pub trait ServerUser: Send + Sync {
/// different user might return different workspace id.
fn workspace_id(&self) -> FlowyResult<String>;
}

View File

@ -5,19 +5,26 @@ use client_api::error::ErrorCode::RecordNotFound;
use collab::core::collab::DataSource;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use tracing::error;
use std::sync::Arc;
use tracing::{error, instrument};
use flowy_database_pub::cloud::{CollabDocStateByOid, DatabaseCloudService, DatabaseSnapshot};
use lib_infra::future::FutureResult;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::AFServer;
pub(crate) struct AFCloudDatabaseCloudServiceImpl<T>(pub T);
pub(crate) struct AFCloudDatabaseCloudServiceImpl<T> {
pub inner: T,
pub user: Arc<dyn ServerUser>,
}
impl<T> DatabaseCloudService for AFCloudDatabaseCloudServiceImpl<T>
where
T: AFServer,
{
#[instrument(level = "debug", skip_all)]
fn get_database_object_doc_state(
&self,
object_id: &str,
@ -26,17 +33,25 @@ where
) -> FutureResult<Option<Vec<u8>>, Error> {
let workspace_id = workspace_id.to_string();
let object_id = object_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let params = QueryCollabParams {
workspace_id,
workspace_id: workspace_id.clone(),
inner: QueryCollab {
object_id,
collab_type,
object_id: object_id.clone(),
collab_type: collab_type.clone(),
},
};
match try_get_client?.get_collab(params).await {
Ok(data) => Ok(Some(data.encode_collab.doc_state.to_vec())),
Ok(data) => {
check_request_workspace_id_is_match(
&workspace_id,
&cloned_user,
format!("get database object: {}:{}", object_id, collab_type),
)?;
Ok(Some(data.encode_collab.doc_state.to_vec()))
},
Err(err) => {
if err.code == RecordNotFound {
Ok(None)
@ -48,6 +63,7 @@ where
})
}
#[instrument(level = "debug", skip_all)]
fn batch_get_database_object_doc_state(
&self,
object_ids: Vec<String>,
@ -55,7 +71,8 @@ where
workspace_id: &str,
) -> FutureResult<CollabDocStateByOid, Error> {
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let client = try_get_client?;
let params = object_ids
@ -66,6 +83,11 @@ where
})
.collect();
let results = client.batch_get_collab(&workspace_id, params).await?;
check_request_workspace_id_is_match(
&workspace_id,
&cloned_user,
"batch get database object",
)?;
Ok(
results
.0

View File

@ -4,30 +4,39 @@ use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab_document::document::Document;
use collab_entity::CollabType;
use std::sync::Arc;
use tracing::instrument;
use flowy_document_pub::cloud::*;
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::AFServer;
pub(crate) struct AFCloudDocumentCloudServiceImpl<T>(pub T);
pub(crate) struct AFCloudDocumentCloudServiceImpl<T> {
pub inner: T,
pub user: Arc<dyn ServerUser>,
}
impl<T> DocumentCloudService for AFCloudDocumentCloudServiceImpl<T>
where
T: AFServer,
{
#[instrument(level = "debug", skip_all, fields(document_id = %document_id))]
fn get_document_doc_state(
&self,
document_id: &str,
workspace_id: &str,
) -> FutureResult<Vec<u8>, FlowyError> {
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let document_id = document_id.to_string();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let params = QueryCollabParams {
workspace_id,
workspace_id: workspace_id.clone(),
inner: QueryCollab {
object_id: document_id.to_string(),
collab_type: CollabType::Document,
@ -40,6 +49,13 @@ where
.encode_collab
.doc_state
.to_vec();
check_request_workspace_id_is_match(
&workspace_id,
&cloned_user,
format!("get document doc state:{}", document_id),
)?;
Ok(doc_state)
})
}
@ -53,17 +69,19 @@ where
FutureResult::new(async move { Ok(vec![]) })
}
#[instrument(level = "debug", skip_all)]
fn get_document_data(
&self,
document_id: &str,
workspace_id: &str,
) -> FutureResult<Option<DocumentData>, Error> {
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let document_id = document_id.to_string();
let workspace_id = workspace_id.to_string();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let params = QueryCollabParams {
workspace_id,
workspace_id: workspace_id.clone(),
inner: QueryCollab {
object_id: document_id.clone(),
collab_type: CollabType::Document,
@ -76,6 +94,11 @@ where
.encode_collab
.doc_state
.to_vec();
check_request_workspace_id_is_match(
&workspace_id,
&cloned_user,
format!("Get {} document", document_id),
)?;
let document = Document::from_doc_state(
CollabOrigin::Empty,
DataSource::DocStateV1(doc_state),

View File

@ -6,6 +6,8 @@ use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab_entity::CollabType;
use collab_folder::RepeatedViewIdentifier;
use std::sync::Arc;
use tracing::instrument;
use flowy_error::FlowyError;
use flowy_folder_pub::cloud::{
@ -14,16 +16,21 @@ use flowy_folder_pub::cloud::{
};
use lib_infra::future::FutureResult;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::AFServer;
pub(crate) struct AFCloudFolderCloudServiceImpl<T>(pub T);
pub(crate) struct AFCloudFolderCloudServiceImpl<T> {
pub inner: T,
pub user: Arc<dyn ServerUser>,
}
impl<T> FolderCloudService for AFCloudFolderCloudServiceImpl<T>
where
T: AFServer,
{
fn create_workspace(&self, _uid: i64, name: &str) -> FutureResult<Workspace, Error> {
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let cloned_name = name.to_string();
FutureResult::new(async move {
let client = try_get_client?;
@ -47,7 +54,7 @@ where
fn open_workspace(&self, workspace_id: &str) -> FutureResult<(), Error> {
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
FutureResult::new(async move {
let client = try_get_client?;
let _ = client.open_workspace(&workspace_id).await?;
@ -56,7 +63,7 @@ where
}
fn get_all_workspace(&self) -> FutureResult<Vec<WorkspaceRecord>, Error> {
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
FutureResult::new(async move {
let client = try_get_client?;
let records = client
@ -73,7 +80,7 @@ where
Ok(records)
})
}
#[instrument(level = "debug", skip_all)]
fn get_folder_data(
&self,
workspace_id: &str,
@ -81,7 +88,8 @@ where
) -> FutureResult<Option<FolderData>, Error> {
let uid = *uid;
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let params = QueryCollabParams {
workspace_id: workspace_id.clone(),
@ -97,6 +105,7 @@ where
.encode_collab
.doc_state
.to_vec();
check_request_workspace_id_is_match(&workspace_id, &cloned_user, "get folder data")?;
let folder = Folder::from_collab_doc_state(
uid,
CollabOrigin::Empty,
@ -104,7 +113,7 @@ where
&workspace_id,
vec![],
)?;
Ok(folder.get_folder_data())
Ok(folder.get_folder_data(&workspace_id))
})
}
@ -116,6 +125,7 @@ where
FutureResult::new(async move { Ok(vec![]) })
}
#[instrument(level = "debug", skip_all)]
fn get_folder_doc_state(
&self,
workspace_id: &str,
@ -125,10 +135,11 @@ where
) -> FutureResult<Vec<u8>, Error> {
let object_id = object_id.to_string();
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let params = QueryCollabParams {
workspace_id,
workspace_id: workspace_id.clone(),
inner: QueryCollab {
object_id,
collab_type,
@ -141,6 +152,7 @@ where
.encode_collab
.doc_state
.to_vec();
check_request_workspace_id_is_match(&workspace_id, &cloned_user, "get folder doc state")?;
Ok(doc_state)
})
}
@ -151,7 +163,7 @@ where
objects: Vec<FolderCollabParams>,
) -> FutureResult<(), Error> {
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.inner.try_get_client();
FutureResult::new(async move {
let params = objects
.into_iter()

View File

@ -9,3 +9,4 @@ mod document;
mod file_storage;
mod folder;
mod user;
mod util;

View File

@ -13,6 +13,7 @@ use client_api::entity::{QueryCollab, QueryCollabParams};
use client_api::{Client, ClientConfiguration};
use collab_entity::{CollabObject, CollabType};
use parking_lot::RwLock;
use tracing::instrument;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_user_pub::cloud::{UserCloudService, UserCollabParams, UserUpdate, UserUpdateReceiver};
@ -24,11 +25,12 @@ use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use uuid::Uuid;
use crate::af_cloud::define::USER_SIGN_IN_URL;
use crate::af_cloud::define::{ServerUser, USER_SIGN_IN_URL};
use crate::af_cloud::impls::user::dto::{
af_update_from_update_params, from_af_workspace_member, to_af_role, user_profile_from_af_profile,
};
use crate::af_cloud::impls::user::util::encryption_type_from_profile;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::{AFCloudClient, AFServer};
use super::dto::{from_af_workspace_invitation_status, to_workspace_invitation_status};
@ -36,13 +38,19 @@ use super::dto::{from_af_workspace_invitation_status, to_workspace_invitation_st
pub(crate) struct AFCloudUserAuthServiceImpl<T> {
server: T,
user_change_recv: RwLock<Option<tokio::sync::mpsc::Receiver<UserUpdate>>>,
user: Arc<dyn ServerUser>,
}
impl<T> AFCloudUserAuthServiceImpl<T> {
pub(crate) fn new(server: T, user_change_recv: tokio::sync::mpsc::Receiver<UserUpdate>) -> Self {
pub(crate) fn new(
server: T,
user_change_recv: tokio::sync::mpsc::Receiver<UserUpdate>,
user: Arc<dyn ServerUser>,
) -> Self {
Self {
server,
user_change_recv: RwLock::new(Some(user_change_recv)),
user,
}
}
}
@ -166,16 +174,27 @@ where
})
}
#[instrument(level = "debug", skip_all)]
fn get_user_profile(
&self,
_credential: UserCredentials,
) -> FutureResult<UserProfile, FlowyError> {
let try_get_client = self.server.try_get_client();
let cloned_user = self.user.clone();
FutureResult::new(async move {
let expected_workspace_id = cloned_user.workspace_id()?;
let client = try_get_client?;
let profile = client.get_profile().await?;
let token = client.get_token()?;
let profile = user_profile_from_af_profile(token, profile)?;
// Discard the response if the user has switched to a new workspace. This avoids updating the
// user profile with potentially outdated information when the workspace ID no longer matches.
check_request_workspace_id_is_match(
&expected_workspace_id,
&cloned_user,
"get user profile",
)?;
Ok(profile)
})
}
@ -315,6 +334,7 @@ where
})
}
#[instrument(level = "debug", skip_all)]
fn get_user_awareness_doc_state(
&self,
_uid: i64,
@ -324,16 +344,21 @@ where
let workspace_id = workspace_id.to_string();
let object_id = object_id.to_string();
let try_get_client = self.server.try_get_client();
FutureResult::new(async {
let cloned_user = self.user.clone();
FutureResult::new(async move {
let params = QueryCollabParams {
workspace_id,
workspace_id: workspace_id.clone(),
inner: QueryCollab {
object_id,
collab_type: CollabType::UserAwareness,
},
};
let resp = try_get_client?.get_collab(params).await?;
check_request_workspace_id_is_match(
&workspace_id,
&cloned_user,
"get user awareness object",
)?;
Ok(resp.encode_collab.doc_state.to_vec())
})
}

View File

@ -0,0 +1,29 @@
use crate::af_cloud::define::ServerUser;
use flowy_error::{FlowyError, FlowyResult};
use std::sync::Arc;
use tracing::warn;
/// Validates the workspace_id provided in the request.
/// It checks that the workspace_id from the request matches the current user's active workspace_id.
/// This ensures that the operation is being performed in the correct workspace context, enhancing security.
pub fn check_request_workspace_id_is_match(
expected_workspace_id: &str,
user: &Arc<dyn ServerUser>,
action: impl AsRef<str>,
) -> FlowyResult<()> {
let actual_workspace_id = user.workspace_id()?;
if expected_workspace_id != actual_workspace_id {
warn!(
"{}, expect workspace_id: {}, actual workspace_id: {}",
action.as_ref(),
expected_workspace_id,
actual_workspace_id
);
return Err(
FlowyError::internal()
.with_context("Current workspace was changed when processing the request"),
);
}
Ok(())
}

View File

@ -17,6 +17,7 @@ use tokio_stream::wrappers::WatchStream;
use tracing::{error, event, info, warn};
use uuid::Uuid;
use crate::af_cloud::define::ServerUser;
use flowy_database_pub::cloud::DatabaseCloudService;
use flowy_document_pub::cloud::DocumentCloudService;
use flowy_error::{ErrorCode, FlowyError};
@ -42,6 +43,7 @@ pub struct AppFlowyCloudServer {
network_reachable: Arc<AtomicBool>,
pub device_id: String,
ws_client: Arc<WSClient>,
user: Arc<dyn ServerUser>,
}
impl AppFlowyCloudServer {
@ -50,6 +52,7 @@ impl AppFlowyCloudServer {
enable_sync: bool,
mut device_id: String,
client_version: &str,
user: Arc<dyn ServerUser>,
) -> Self {
// The device id can't be empty, so we generate a new one if it is.
if device_id.is_empty() {
@ -83,6 +86,7 @@ impl AppFlowyCloudServer {
network_reachable,
device_id,
ws_client,
user,
}
}
@ -159,28 +163,41 @@ impl AppFlowyServer for AppFlowyCloudServer {
}
});
Arc::new(AFCloudUserAuthServiceImpl::new(server, rx))
Arc::new(AFCloudUserAuthServiceImpl::new(
server,
rx,
self.user.clone(),
))
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
let server = AFServerImpl {
client: self.get_client(),
};
Arc::new(AFCloudFolderCloudServiceImpl(server))
Arc::new(AFCloudFolderCloudServiceImpl {
inner: server,
user: self.user.clone(),
})
}
fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
let server = AFServerImpl {
client: self.get_client(),
};
Arc::new(AFCloudDatabaseCloudServiceImpl(server))
Arc::new(AFCloudDatabaseCloudServiceImpl {
inner: server,
user: self.user.clone(),
})
}
fn document_service(&self) -> Arc<dyn DocumentCloudService> {
let server = AFServerImpl {
client: self.get_client(),
};
Arc::new(AFCloudDocumentCloudServiceImpl(server))
Arc::new(AFCloudDocumentCloudServiceImpl {
inner: server,
user: self.user.clone(),
})
}
fn subscribe_ws_state(&self) -> Option<WSConnectStateReceiver> {

View File

@ -109,7 +109,7 @@ where
&workspace_id,
vec![],
)?;
Ok(folder.get_folder_data())
Ok(folder.get_folder_data(&workspace_id))
})
}