mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
config folder ws
This commit is contained in:
parent
3964508cd8
commit
13aba928c3
1
backend/Cargo.lock
generated
1
backend/Cargo.lock
generated
@ -1262,6 +1262,7 @@ dependencies = [
|
||||
"flowy-derive",
|
||||
"flowy-document",
|
||||
"flowy-error",
|
||||
"flowy-sync",
|
||||
"futures",
|
||||
"futures-core",
|
||||
"lazy_static",
|
||||
|
@ -13,8 +13,8 @@ use crate::{
|
||||
},
|
||||
context::AppContext,
|
||||
services::{
|
||||
core::{app::router as app, trash::router as trash, view::router as view, workspace::router as workspace},
|
||||
document::router as doc,
|
||||
folder::{app::router as app, trash::router as trash, view::router as view, workspace::router as workspace},
|
||||
user::router as user,
|
||||
web_socket::WSServer,
|
||||
},
|
||||
@ -131,7 +131,7 @@ fn user_scope() -> Scope {
|
||||
}
|
||||
|
||||
pub async fn init_app_context(configuration: &Settings) -> AppContext {
|
||||
let _ = crate::services::core::log::Builder::new("flowy-server")
|
||||
let _ = crate::services::log::Builder::new("flowy-server")
|
||||
.env_filter("Trace")
|
||||
.build();
|
||||
let pg_pool = get_connection_pool(&configuration.database)
|
||||
|
@ -5,9 +5,12 @@ use crate::services::{
|
||||
use actix::Addr;
|
||||
use actix_web::web::Data;
|
||||
|
||||
use crate::services::document::{
|
||||
persistence::DocumentKVPersistence,
|
||||
ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence},
|
||||
use crate::services::{
|
||||
document::{
|
||||
persistence::DocumentKVPersistence,
|
||||
ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence},
|
||||
},
|
||||
folder::ws_receiver::make_folder_ws_receiver,
|
||||
};
|
||||
use flowy_collaboration::server_document::ServerDocumentManager;
|
||||
use lib_ws::WSModule;
|
||||
@ -35,6 +38,10 @@ impl AppContext {
|
||||
|
||||
let document_ws_receiver = make_document_ws_receiver(flowy_persistence.clone(), document_manager.clone());
|
||||
ws_receivers.set(WSModule::Doc, document_ws_receiver);
|
||||
|
||||
let folder_ws_receiver = make_folder_ws_receiver(flowy_persistence.clone());
|
||||
ws_receivers.set(WSModule::Folder, folder_ws_receiver);
|
||||
|
||||
AppContext {
|
||||
ws_server,
|
||||
persistence: Data::new(flowy_persistence),
|
||||
|
@ -13,13 +13,13 @@ use flowy_collaboration::{
|
||||
ClientRevisionWSDataType as ClientRevisionWSDataTypePB,
|
||||
Revision as RevisionPB,
|
||||
},
|
||||
server_document::{RevisionUser, ServerDocumentManager, SyncResponse},
|
||||
server_document::{RevisionSyncResponse, RevisionUser, ServerDocumentManager},
|
||||
};
|
||||
use futures::stream::StreamExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
pub enum WSActorMessage {
|
||||
pub enum DocumentWSActorMessage {
|
||||
ClientData {
|
||||
client_data: WSClientData,
|
||||
persistence: Arc<FlowyPersistence>,
|
||||
@ -28,12 +28,12 @@ pub enum WSActorMessage {
|
||||
}
|
||||
|
||||
pub struct DocumentWebSocketActor {
|
||||
receiver: Option<mpsc::Receiver<WSActorMessage>>,
|
||||
receiver: Option<mpsc::Receiver<DocumentWSActorMessage>>,
|
||||
doc_manager: Arc<ServerDocumentManager>,
|
||||
}
|
||||
|
||||
impl DocumentWebSocketActor {
|
||||
pub fn new(receiver: mpsc::Receiver<WSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
|
||||
pub fn new(receiver: mpsc::Receiver<DocumentWSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
|
||||
Self {
|
||||
receiver: Some(receiver),
|
||||
doc_manager: manager,
|
||||
@ -44,7 +44,7 @@ impl DocumentWebSocketActor {
|
||||
let mut receiver = self
|
||||
.receiver
|
||||
.take()
|
||||
.expect("DocActor's receiver should only take one time");
|
||||
.expect("DocumentWebSocketActor's receiver should only take one time");
|
||||
|
||||
let stream = stream! {
|
||||
loop {
|
||||
@ -58,19 +58,19 @@ impl DocumentWebSocketActor {
|
||||
stream.for_each(|msg| self.handle_message(msg)).await;
|
||||
}
|
||||
|
||||
async fn handle_message(&self, msg: WSActorMessage) {
|
||||
async fn handle_message(&self, msg: DocumentWSActorMessage) {
|
||||
match msg {
|
||||
WSActorMessage::ClientData {
|
||||
DocumentWSActorMessage::ClientData {
|
||||
client_data,
|
||||
persistence: _,
|
||||
ret,
|
||||
} => {
|
||||
let _ = ret.send(self.handle_client_data(client_data).await);
|
||||
let _ = ret.send(self.handle_document_data(client_data).await);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_client_data(&self, client_data: WSClientData) -> Result<()> {
|
||||
async fn handle_document_data(&self, client_data: WSClientData) -> Result<()> {
|
||||
let WSClientData { user, socket, data } = client_data;
|
||||
let document_client_data = spawn_blocking(move || parse_from_bytes::<ClientRevisionWSDataPB>(&data))
|
||||
.await
|
||||
@ -83,8 +83,7 @@ impl DocumentWebSocketActor {
|
||||
document_client_data.ty
|
||||
);
|
||||
|
||||
let user = Arc::new(HttpDocumentUser { user, socket });
|
||||
|
||||
let user = Arc::new(DocumentRevisionUser { user, socket });
|
||||
match &document_client_data.ty {
|
||||
ClientRevisionWSDataTypePB::ClientPushRev => {
|
||||
let _ = self
|
||||
@ -115,34 +114,34 @@ fn verify_md5(revision: &RevisionPB) -> Result<()> {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpDocumentUser {
|
||||
pub struct DocumentRevisionUser {
|
||||
pub user: Arc<WSUser>,
|
||||
pub(crate) socket: Socket,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for HttpDocumentUser {
|
||||
impl std::fmt::Debug for DocumentRevisionUser {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("ServerDocUser")
|
||||
f.debug_struct("DocumentRevisionUser")
|
||||
.field("user", &self.user)
|
||||
.field("socket", &self.socket)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl RevisionUser for HttpDocumentUser {
|
||||
impl RevisionUser for DocumentRevisionUser {
|
||||
fn user_id(&self) -> String { self.user.id().to_string() }
|
||||
|
||||
fn receive(&self, resp: SyncResponse) {
|
||||
fn receive(&self, resp: RevisionSyncResponse) {
|
||||
let result = match resp {
|
||||
SyncResponse::Pull(data) => {
|
||||
RevisionSyncResponse::Pull(data) => {
|
||||
let msg: WebSocketMessage = data.into();
|
||||
self.socket.try_send(msg).map_err(internal_error)
|
||||
},
|
||||
SyncResponse::Push(data) => {
|
||||
RevisionSyncResponse::Push(data) => {
|
||||
let msg: WebSocketMessage = data.into();
|
||||
self.socket.try_send(msg).map_err(internal_error)
|
||||
},
|
||||
SyncResponse::Ack(data) => {
|
||||
RevisionSyncResponse::Ack(data) => {
|
||||
let msg: WebSocketMessage = data.into();
|
||||
self.socket.try_send(msg).map_err(internal_error)
|
||||
},
|
||||
@ -150,7 +149,7 @@ impl RevisionUser for HttpDocumentUser {
|
||||
|
||||
match result {
|
||||
Ok(_) => {},
|
||||
Err(e) => log::error!("[ServerDocUser]: {}", e),
|
||||
Err(e) => log::error!("[DocumentRevisionUser]: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ use crate::{
|
||||
services::{
|
||||
document::{
|
||||
persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence},
|
||||
ws_actor::{DocumentWebSocketActor, WSActorMessage},
|
||||
ws_actor::{DocumentWSActorMessage, DocumentWebSocketActor},
|
||||
},
|
||||
web_socket::{WSClientData, WebSocketReceiver},
|
||||
},
|
||||
@ -33,7 +33,7 @@ pub fn make_document_ws_receiver(
|
||||
persistence: Arc<FlowyPersistence>,
|
||||
document_manager: Arc<ServerDocumentManager>,
|
||||
) -> Arc<DocumentWebSocketReceiver> {
|
||||
let (ws_sender, rx) = tokio::sync::mpsc::channel(100);
|
||||
let (ws_sender, rx) = tokio::sync::mpsc::channel(1000);
|
||||
let actor = DocumentWebSocketActor::new(rx, document_manager);
|
||||
tokio::task::spawn(actor.run());
|
||||
|
||||
@ -41,12 +41,12 @@ pub fn make_document_ws_receiver(
|
||||
}
|
||||
|
||||
pub struct DocumentWebSocketReceiver {
|
||||
ws_sender: mpsc::Sender<WSActorMessage>,
|
||||
ws_sender: mpsc::Sender<DocumentWSActorMessage>,
|
||||
persistence: Arc<FlowyPersistence>,
|
||||
}
|
||||
|
||||
impl DocumentWebSocketReceiver {
|
||||
pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<WSActorMessage>) -> Self {
|
||||
pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<DocumentWSActorMessage>) -> Self {
|
||||
Self { ws_sender, persistence }
|
||||
}
|
||||
}
|
||||
@ -58,7 +58,7 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
|
||||
let persistence = self.persistence.clone();
|
||||
|
||||
actix_rt::spawn(async move {
|
||||
let msg = WSActorMessage::ClientData {
|
||||
let msg = DocumentWSActorMessage::ClientData {
|
||||
client_data: data,
|
||||
persistence,
|
||||
ret,
|
||||
@ -82,7 +82,6 @@ impl Debug for HttpDocumentCloudPersistence {
|
||||
}
|
||||
|
||||
impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
|
||||
fn enable_sync(&self) -> bool { true }
|
||||
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
|
||||
let params = DocumentId {
|
||||
doc_id: doc_id.to_string(),
|
||||
|
@ -1,8 +1,8 @@
|
||||
use crate::services::core::view::read_view_belong_to_id;
|
||||
use crate::services::folder::view::read_view_belong_to_id;
|
||||
|
||||
use crate::{
|
||||
entities::logged_user::LoggedUser,
|
||||
services::core::{app::persistence::*, trash::read_trash_ids},
|
||||
services::folder::{app::persistence::*, trash::read_trash_ids},
|
||||
util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
|
||||
};
|
||||
use backend_service::errors::{invalid_params, ServerError};
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
entities::logged_user::LoggedUser,
|
||||
services::core::app::{
|
||||
services::folder::app::{
|
||||
controller::{create_app, delete_app, read_app, update_app},
|
||||
persistence::check_app_id,
|
||||
},
|
@ -1,5 +1,6 @@
|
||||
pub mod app;
|
||||
pub(crate) mod log;
|
||||
pub mod trash;
|
||||
pub mod view;
|
||||
pub mod workspace;
|
||||
pub(crate) mod ws_actor;
|
||||
pub(crate) mod ws_receiver;
|
@ -1,4 +1,4 @@
|
||||
use crate::services::core::{app::persistence::AppTable, view::persistence::ViewTable};
|
||||
use crate::services::folder::{app::persistence::AppTable, view::persistence::ViewTable};
|
||||
use flowy_core_data_model::protobuf::{Trash, TrashType};
|
||||
|
||||
pub(crate) const TRASH_TABLE: &str = "trash_table";
|
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
context::FlowyPersistence,
|
||||
entities::logged_user::LoggedUser,
|
||||
services::core::trash::{create_trash, delete_all_trash, delete_trash, read_trash},
|
||||
services::folder::trash::{create_trash, delete_all_trash, delete_trash, read_trash},
|
||||
util::serde_ext::parse_from_payload,
|
||||
};
|
||||
use ::protobuf::ProtobufEnum;
|
@ -1,12 +1,12 @@
|
||||
use crate::{
|
||||
entities::logged_user::LoggedUser,
|
||||
services::{
|
||||
core::{
|
||||
document::persistence::DocumentKVPersistence,
|
||||
folder::{
|
||||
app::controller::{delete_app, read_app_table},
|
||||
trash::persistence::{TrashTable, TRASH_TABLE},
|
||||
view::{delete_view, read_view_table},
|
||||
},
|
||||
document::persistence::DocumentKVPersistence,
|
||||
},
|
||||
util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
|
||||
};
|
@ -1,8 +1,8 @@
|
||||
use crate::{
|
||||
entities::logged_user::LoggedUser,
|
||||
services::{
|
||||
core::{trash::read_trash_ids, view::persistence::*},
|
||||
document::persistence::{create_document, delete_document, DocumentKVPersistence},
|
||||
folder::{trash::read_trash_ids, view::persistence::*},
|
||||
},
|
||||
util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
|
||||
};
|
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
context::FlowyPersistence,
|
||||
entities::logged_user::LoggedUser,
|
||||
services::core::view::{
|
||||
services::folder::view::{
|
||||
create_view,
|
||||
delete_view,
|
||||
persistence::{check_view_id, check_view_ids},
|
@ -1,7 +1,7 @@
|
||||
use super::persistence::NewWorkspaceBuilder;
|
||||
use crate::{
|
||||
entities::logged_user::LoggedUser,
|
||||
services::core::{
|
||||
services::folder::{
|
||||
app::{controller::read_app, persistence::AppTable},
|
||||
workspace::persistence::*,
|
||||
},
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
entities::logged_user::LoggedUser,
|
||||
services::core::workspace::{
|
||||
services::folder::workspace::{
|
||||
create_workspace,
|
||||
delete_workspace,
|
||||
persistence::check_workspace_id,
|
132
backend/src/services/folder/ws_actor.rs
Normal file
132
backend/src/services/folder/ws_actor.rs
Normal file
@ -0,0 +1,132 @@
|
||||
use crate::{
|
||||
context::FlowyPersistence,
|
||||
services::web_socket::{entities::Socket, WSClientData, WSUser, WebSocketMessage},
|
||||
util::serde_ext::parse_from_bytes,
|
||||
};
|
||||
use actix_rt::task::spawn_blocking;
|
||||
use async_stream::stream;
|
||||
use backend_service::errors::{internal_error, Result};
|
||||
use flowy_collaboration::{
|
||||
protobuf::{
|
||||
ClientRevisionWSData as ClientRevisionWSDataPB,
|
||||
ClientRevisionWSDataType as ClientRevisionWSDataTypePB,
|
||||
},
|
||||
server_document::{RevisionSyncResponse, RevisionUser, ServerDocumentManager},
|
||||
};
|
||||
use futures::stream::StreamExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
pub enum FolderWSActorMessage {
|
||||
ClientData {
|
||||
client_data: WSClientData,
|
||||
persistence: Arc<FlowyPersistence>,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct FolderWebSocketActor {
|
||||
receiver: Option<mpsc::Receiver<FolderWSActorMessage>>,
|
||||
}
|
||||
|
||||
impl FolderWebSocketActor {
|
||||
pub fn new(receiver: mpsc::Receiver<FolderWSActorMessage>) -> Self {
|
||||
Self {
|
||||
receiver: Some(receiver),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
let mut receiver = self
|
||||
.receiver
|
||||
.take()
|
||||
.expect("FolderWebSocketActor's receiver should only take one time");
|
||||
let stream = stream! {
|
||||
loop {
|
||||
match receiver.recv().await {
|
||||
Some(msg) => yield msg,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
};
|
||||
stream.for_each(|msg| self.handle_message(msg)).await;
|
||||
}
|
||||
|
||||
async fn handle_message(&self, msg: FolderWSActorMessage) {
|
||||
match msg {
|
||||
FolderWSActorMessage::ClientData {
|
||||
client_data,
|
||||
persistence: _,
|
||||
ret,
|
||||
} => {
|
||||
let _ = ret.send(self.handle_folder_data(client_data).await);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_folder_data(&self, client_data: WSClientData) -> Result<()> {
|
||||
let WSClientData { user, socket, data } = client_data;
|
||||
let folder_client_data = spawn_blocking(move || parse_from_bytes::<ClientRevisionWSDataPB>(&data))
|
||||
.await
|
||||
.map_err(internal_error)??;
|
||||
|
||||
tracing::debug!(
|
||||
"[DocumentWebSocketActor]: receive: {}:{}, {:?}",
|
||||
folder_client_data.object_id,
|
||||
folder_client_data.data_id,
|
||||
folder_client_data.ty
|
||||
);
|
||||
|
||||
let _user = Arc::new(FolderRevisionUser { user, socket });
|
||||
match &folder_client_data.ty {
|
||||
ClientRevisionWSDataTypePB::ClientPushRev => {
|
||||
todo!()
|
||||
},
|
||||
ClientRevisionWSDataTypePB::ClientPing => {
|
||||
todo!()
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FolderRevisionUser {
|
||||
pub user: Arc<WSUser>,
|
||||
pub(crate) socket: Socket,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FolderRevisionUser {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("FolderRevisionUser")
|
||||
.field("user", &self.user)
|
||||
.field("socket", &self.socket)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl RevisionUser for FolderRevisionUser {
|
||||
fn user_id(&self) -> String { self.user.id().to_string() }
|
||||
|
||||
fn receive(&self, resp: RevisionSyncResponse) {
|
||||
let result = match resp {
|
||||
RevisionSyncResponse::Pull(data) => {
|
||||
let msg: WebSocketMessage = data.into();
|
||||
self.socket.try_send(msg).map_err(internal_error)
|
||||
},
|
||||
RevisionSyncResponse::Push(data) => {
|
||||
let msg: WebSocketMessage = data.into();
|
||||
self.socket.try_send(msg).map_err(internal_error)
|
||||
},
|
||||
RevisionSyncResponse::Ack(data) => {
|
||||
let msg: WebSocketMessage = data.into();
|
||||
self.socket.try_send(msg).map_err(internal_error)
|
||||
},
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(_) => {},
|
||||
Err(e) => log::error!("[FolderRevisionUser]: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
53
backend/src/services/folder/ws_receiver.rs
Normal file
53
backend/src/services/folder/ws_receiver.rs
Normal file
@ -0,0 +1,53 @@
|
||||
use crate::{
|
||||
context::FlowyPersistence,
|
||||
services::{
|
||||
folder::ws_actor::{FolderWSActorMessage, FolderWebSocketActor},
|
||||
web_socket::{WSClientData, WebSocketReceiver},
|
||||
},
|
||||
};
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
pub fn make_folder_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<FolderWebSocketReceiver> {
|
||||
let (ws_sender, rx) = tokio::sync::mpsc::channel(1000);
|
||||
let actor = FolderWebSocketActor::new(rx);
|
||||
tokio::task::spawn(actor.run());
|
||||
Arc::new(FolderWebSocketReceiver::new(persistence, ws_sender))
|
||||
}
|
||||
|
||||
pub struct FolderWebSocketReceiver {
|
||||
ws_sender: mpsc::Sender<FolderWSActorMessage>,
|
||||
persistence: Arc<FlowyPersistence>,
|
||||
}
|
||||
|
||||
impl FolderWebSocketReceiver {
|
||||
pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<FolderWSActorMessage>) -> Self {
|
||||
Self { ws_sender, persistence }
|
||||
}
|
||||
}
|
||||
|
||||
impl WebSocketReceiver for FolderWebSocketReceiver {
|
||||
fn receive(&self, data: WSClientData) {
|
||||
let (ret, rx) = oneshot::channel();
|
||||
let sender = self.ws_sender.clone();
|
||||
let persistence = self.persistence.clone();
|
||||
|
||||
actix_rt::spawn(async move {
|
||||
let msg = FolderWSActorMessage::ClientData {
|
||||
client_data: data,
|
||||
persistence,
|
||||
ret,
|
||||
};
|
||||
|
||||
match sender.send(msg).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => log::error!("{}", e),
|
||||
}
|
||||
match rx.await {
|
||||
Ok(_) => {},
|
||||
Err(e) => log::error!("{:?}", e),
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
pub mod core;
|
||||
pub mod document;
|
||||
pub mod folder;
|
||||
pub mod kv;
|
||||
pub(crate) mod log;
|
||||
pub mod user;
|
||||
pub mod web_socket;
|
||||
|
@ -63,14 +63,14 @@ struct ScriptContext {
|
||||
impl ScriptContext {
|
||||
async fn new(client_sdk: FlowySDKTest, server: TestServer) -> Self {
|
||||
let user_session = client_sdk.user_session.clone();
|
||||
let ws_manager = client_sdk.ws_conn.clone();
|
||||
let ws_conn = client_sdk.ws_conn.clone();
|
||||
let doc_id = create_doc(&client_sdk).await;
|
||||
|
||||
Self {
|
||||
client_editor: None,
|
||||
client_sdk,
|
||||
client_user_session: user_session,
|
||||
ws_conn: ws_manager,
|
||||
ws_conn,
|
||||
server,
|
||||
doc_id,
|
||||
}
|
||||
|
@ -11,9 +11,11 @@ import 'package:protobuf/protobuf.dart' as $pb;
|
||||
|
||||
class WSModule extends $pb.ProtobufEnum {
|
||||
static const WSModule Doc = WSModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc');
|
||||
static const WSModule Folder = WSModule._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Folder');
|
||||
|
||||
static const $core.List<WSModule> values = <WSModule> [
|
||||
Doc,
|
||||
Folder,
|
||||
];
|
||||
|
||||
static final $core.Map<$core.int, WSModule> _byValue = $pb.ProtobufEnum.initByValue(values);
|
||||
|
@ -13,11 +13,12 @@ const WSModule$json = const {
|
||||
'1': 'WSModule',
|
||||
'2': const [
|
||||
const {'1': 'Doc', '2': 0},
|
||||
const {'1': 'Folder', '2': 1},
|
||||
],
|
||||
};
|
||||
|
||||
/// Descriptor for `WSModule`. Decode as a `google.protobuf.EnumDescriptorProto`.
|
||||
final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQAA==');
|
||||
final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQABIKCgZGb2xkZXIQAQ==');
|
||||
@$core.Deprecated('Use webSocketRawMessageDescriptor instead')
|
||||
const WebSocketRawMessage$json = const {
|
||||
'1': 'WebSocketRawMessage',
|
||||
|
@ -18,6 +18,7 @@ flowy-error = { path = "../flowy-error", features = ["db", "backend"]}
|
||||
dart-notify = { path = "../dart-notify" }
|
||||
lib-dispatch = { path = "../lib-dispatch" }
|
||||
lib-sqlite = { path = "../lib-sqlite" }
|
||||
flowy-sync = { path = "../flowy-sync" }
|
||||
|
||||
parking_lot = "0.11"
|
||||
protobuf = {version = "2.18.0"}
|
||||
|
@ -1,6 +1,9 @@
|
||||
use bytes::Bytes;
|
||||
use chrono::Utc;
|
||||
use flowy_collaboration::client_document::default::{initial_delta, initial_read_me};
|
||||
use flowy_core_data_model::{entities::view::CreateViewParams, user_default};
|
||||
use flowy_document::context::DocumentContext;
|
||||
use flowy_sync::RevisionWebSocket;
|
||||
use lazy_static::lazy_static;
|
||||
use parking_lot::RwLock;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
@ -23,7 +26,7 @@ lazy_static! {
|
||||
static ref INIT_WORKSPACE: RwLock<HashMap<String, bool>> = RwLock::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub struct CoreContext {
|
||||
pub struct FolderManager {
|
||||
pub user: Arc<dyn WorkspaceUser>,
|
||||
pub(crate) cloud_service: Arc<dyn WorkspaceCloudService>,
|
||||
pub(crate) persistence: Arc<FlowyCorePersistence>,
|
||||
@ -31,22 +34,49 @@ pub struct CoreContext {
|
||||
pub(crate) app_controller: Arc<AppController>,
|
||||
pub(crate) view_controller: Arc<ViewController>,
|
||||
pub(crate) trash_controller: Arc<TrashController>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
}
|
||||
|
||||
impl CoreContext {
|
||||
impl FolderManager {
|
||||
pub(crate) fn new(
|
||||
user: Arc<dyn WorkspaceUser>,
|
||||
cloud_service: Arc<dyn WorkspaceCloudService>,
|
||||
persistence: Arc<FlowyCorePersistence>,
|
||||
workspace_controller: Arc<WorkspaceController>,
|
||||
app_controller: Arc<AppController>,
|
||||
view_controller: Arc<ViewController>,
|
||||
trash_controller: Arc<TrashController>,
|
||||
flowy_document: Arc<DocumentContext>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
) -> Self {
|
||||
if let Ok(token) = user.token() {
|
||||
INIT_WORKSPACE.write().insert(token, false);
|
||||
}
|
||||
|
||||
let trash_controller = Arc::new(TrashController::new(
|
||||
persistence.clone(),
|
||||
cloud_service.clone(),
|
||||
user.clone(),
|
||||
));
|
||||
|
||||
let view_controller = Arc::new(ViewController::new(
|
||||
user.clone(),
|
||||
persistence.clone(),
|
||||
cloud_service.clone(),
|
||||
trash_controller.clone(),
|
||||
flowy_document,
|
||||
));
|
||||
|
||||
let app_controller = Arc::new(AppController::new(
|
||||
user.clone(),
|
||||
persistence.clone(),
|
||||
trash_controller.clone(),
|
||||
cloud_service.clone(),
|
||||
));
|
||||
|
||||
let workspace_controller = Arc::new(WorkspaceController::new(
|
||||
user.clone(),
|
||||
persistence.clone(),
|
||||
trash_controller.clone(),
|
||||
cloud_service.clone(),
|
||||
));
|
||||
|
||||
Self {
|
||||
user,
|
||||
cloud_service,
|
||||
@ -55,6 +85,7 @@ impl CoreContext {
|
||||
app_controller,
|
||||
view_controller,
|
||||
trash_controller,
|
||||
ws_sender,
|
||||
}
|
||||
}
|
||||
|
||||
@ -67,6 +98,8 @@ impl CoreContext {
|
||||
// }
|
||||
// }
|
||||
|
||||
pub async fn did_receive_ws_data(&self, _data: Bytes) {}
|
||||
|
||||
pub async fn user_did_sign_in(&self, token: &str) -> FlowyResult<()> {
|
||||
log::debug!("workspace initialize after sign in");
|
||||
let _ = self.init(token).await?;
|
@ -10,7 +10,7 @@ mod macros;
|
||||
#[macro_use]
|
||||
extern crate flowy_database;
|
||||
|
||||
pub mod context;
|
||||
pub mod controller;
|
||||
mod dart_notification;
|
||||
pub mod protobuf;
|
||||
mod util;
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
context::CoreContext,
|
||||
controller::FolderManager,
|
||||
entities::{
|
||||
app::{App, AppId, CreateAppParams, UpdateAppParams},
|
||||
trash::{RepeatedTrash, RepeatedTrashId},
|
||||
@ -14,14 +14,11 @@ use crate::{
|
||||
trash::event_handler::*,
|
||||
view::event_handler::*,
|
||||
workspace::event_handler::*,
|
||||
AppController,
|
||||
TrashController,
|
||||
ViewController,
|
||||
WorkspaceController,
|
||||
},
|
||||
};
|
||||
use flowy_database::DBConnection;
|
||||
use flowy_document::context::DocumentContext;
|
||||
use flowy_sync::RevisionWebSocket;
|
||||
use lib_dispatch::prelude::*;
|
||||
use lib_infra::future::FutureResult;
|
||||
use lib_sqlite::ConnectionPool;
|
||||
@ -44,61 +41,32 @@ pub trait WorkspaceDatabase: Send + Sync {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_core(
|
||||
pub fn init_folder(
|
||||
user: Arc<dyn WorkspaceUser>,
|
||||
database: Arc<dyn WorkspaceDatabase>,
|
||||
flowy_document: Arc<DocumentContext>,
|
||||
cloud_service: Arc<dyn WorkspaceCloudService>,
|
||||
) -> Arc<CoreContext> {
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
) -> Arc<FolderManager> {
|
||||
let persistence = Arc::new(FlowyCorePersistence::new(database.clone()));
|
||||
|
||||
let trash_controller = Arc::new(TrashController::new(
|
||||
persistence.clone(),
|
||||
cloud_service.clone(),
|
||||
user.clone(),
|
||||
));
|
||||
|
||||
let view_controller = Arc::new(ViewController::new(
|
||||
user.clone(),
|
||||
persistence.clone(),
|
||||
cloud_service.clone(),
|
||||
trash_controller.clone(),
|
||||
flowy_document,
|
||||
));
|
||||
|
||||
let app_controller = Arc::new(AppController::new(
|
||||
user.clone(),
|
||||
persistence.clone(),
|
||||
trash_controller.clone(),
|
||||
cloud_service.clone(),
|
||||
));
|
||||
|
||||
let workspace_controller = Arc::new(WorkspaceController::new(
|
||||
user.clone(),
|
||||
persistence.clone(),
|
||||
trash_controller.clone(),
|
||||
cloud_service.clone(),
|
||||
));
|
||||
|
||||
Arc::new(CoreContext::new(
|
||||
Arc::new(FolderManager::new(
|
||||
user,
|
||||
cloud_service,
|
||||
persistence,
|
||||
workspace_controller,
|
||||
app_controller,
|
||||
view_controller,
|
||||
trash_controller,
|
||||
flowy_document,
|
||||
ws_sender,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn create(core: Arc<CoreContext>) -> Module {
|
||||
pub fn create(folder: Arc<FolderManager>) -> Module {
|
||||
let mut module = Module::new()
|
||||
.name("Flowy-Workspace")
|
||||
.data(core.workspace_controller.clone())
|
||||
.data(core.app_controller.clone())
|
||||
.data(core.view_controller.clone())
|
||||
.data(core.trash_controller.clone())
|
||||
.data(core.clone());
|
||||
.data(folder.workspace_controller.clone())
|
||||
.data(folder.app_controller.clone())
|
||||
.data(folder.view_controller.clone())
|
||||
.data(folder.trash_controller.clone())
|
||||
.data(folder.clone());
|
||||
|
||||
module = module
|
||||
.event(WorkspaceEvent::CreateWorkspace, create_workspace_handler)
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
context::CoreContext,
|
||||
controller::FolderManager,
|
||||
dart_notification::{send_dart_notification, WorkspaceNotification},
|
||||
errors::FlowyError,
|
||||
services::{get_current_workspace, read_local_workspace_apps, WorkspaceController},
|
||||
@ -42,17 +42,17 @@ pub(crate) async fn open_workspace_handler(
|
||||
data_result(workspaces)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(data, core), err)]
|
||||
#[tracing::instrument(skip(data, folder), err)]
|
||||
pub(crate) async fn read_workspaces_handler(
|
||||
data: Data<QueryWorkspaceRequest>,
|
||||
core: Unit<Arc<CoreContext>>,
|
||||
folder: Unit<Arc<FolderManager>>,
|
||||
) -> DataResult<RepeatedWorkspace, FlowyError> {
|
||||
let params: WorkspaceId = data.into_inner().try_into()?;
|
||||
let user_id = core.user.user_id()?;
|
||||
let workspace_controller = core.workspace_controller.clone();
|
||||
let user_id = folder.user.user_id()?;
|
||||
let workspace_controller = folder.workspace_controller.clone();
|
||||
|
||||
let trash_controller = core.trash_controller.clone();
|
||||
let workspaces = core.persistence.begin_transaction(|transaction| {
|
||||
let trash_controller = folder.trash_controller.clone();
|
||||
let workspaces = folder.persistence.begin_transaction(|transaction| {
|
||||
let mut workspaces =
|
||||
workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, &transaction)?;
|
||||
for workspace in workspaces.iter_mut() {
|
||||
@ -61,41 +61,40 @@ pub(crate) async fn read_workspaces_handler(
|
||||
}
|
||||
Ok(workspaces)
|
||||
})?;
|
||||
let _ = read_workspaces_on_server(core, user_id, params);
|
||||
let _ = read_workspaces_on_server(folder, user_id, params);
|
||||
data_result(workspaces)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(core), err)]
|
||||
#[tracing::instrument(skip(folder), err)]
|
||||
pub async fn read_cur_workspace_handler(
|
||||
core: Unit<Arc<CoreContext>>,
|
||||
folder: Unit<Arc<FolderManager>>,
|
||||
) -> DataResult<CurrentWorkspaceSetting, FlowyError> {
|
||||
let workspace_id = get_current_workspace()?;
|
||||
let user_id = core.user.user_id()?;
|
||||
let user_id = folder.user.user_id()?;
|
||||
let params = WorkspaceId {
|
||||
workspace_id: Some(workspace_id.clone()),
|
||||
};
|
||||
|
||||
let workspace = core.persistence.begin_transaction(|transaction| {
|
||||
core.workspace_controller
|
||||
let workspace = folder.persistence.begin_transaction(|transaction| {
|
||||
folder
|
||||
.workspace_controller
|
||||
.read_local_workspace(workspace_id, &user_id, &transaction)
|
||||
})?;
|
||||
|
||||
let latest_view: Option<View> = core.view_controller.latest_visit_view().unwrap_or(None);
|
||||
let latest_view: Option<View> = folder.view_controller.latest_visit_view().unwrap_or(None);
|
||||
let setting = CurrentWorkspaceSetting { workspace, latest_view };
|
||||
let _ = read_workspaces_on_server(core, user_id, params);
|
||||
let _ = read_workspaces_on_server(folder, user_id, params);
|
||||
data_result(setting)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(core), err)]
|
||||
#[tracing::instrument(level = "debug", skip(folder_manager), err)]
|
||||
fn read_workspaces_on_server(
|
||||
core: Unit<Arc<CoreContext>>,
|
||||
folder_manager: Unit<Arc<FolderManager>>,
|
||||
user_id: String,
|
||||
params: WorkspaceId,
|
||||
) -> Result<(), FlowyError> {
|
||||
let (token, server) = (core.user.token()?, core.cloud_service.clone());
|
||||
let _app_ctrl = core.app_controller.clone();
|
||||
let _view_ctrl = core.view_controller.clone();
|
||||
let persistence = core.persistence.clone();
|
||||
let (token, server) = (folder_manager.user.token()?, folder_manager.cloud_service.clone());
|
||||
let persistence = folder_manager.persistence.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let workspaces = server.read_workspace(&token, params).await?;
|
||||
|
@ -1,11 +1,6 @@
|
||||
use crate::{
|
||||
controller::DocumentController,
|
||||
errors::FlowyError,
|
||||
ws_receivers::DocumentWSReceivers,
|
||||
DocumentCloudService,
|
||||
};
|
||||
use crate::{controller::DocumentController, errors::FlowyError};
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_sync::RevisionWebSocket;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub trait DocumentUser: Send + Sync {
|
||||
@ -21,24 +16,6 @@ pub struct DocumentContext {
|
||||
}
|
||||
|
||||
impl DocumentContext {
|
||||
pub fn new(
|
||||
user: Arc<dyn DocumentUser>,
|
||||
ws_receivers: Arc<DocumentWSReceivers>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
cloud_service: Arc<dyn DocumentCloudService>,
|
||||
) -> DocumentContext {
|
||||
let doc_ctrl = Arc::new(DocumentController::new(
|
||||
cloud_service,
|
||||
user.clone(),
|
||||
ws_receivers,
|
||||
ws_sender,
|
||||
));
|
||||
Self {
|
||||
controller: doc_ctrl,
|
||||
user,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(&self) -> Result<(), FlowyError> {
|
||||
let _ = self.controller.init()?;
|
||||
Ok(())
|
||||
|
@ -1,49 +1,52 @@
|
||||
use crate::{
|
||||
context::DocumentUser,
|
||||
core::ClientDocumentEditor,
|
||||
errors::FlowyError,
|
||||
ws_receivers::DocumentWSReceivers,
|
||||
DocumentCloudService,
|
||||
};
|
||||
use crate::{context::DocumentUser, core::ClientDocumentEditor, errors::FlowyError, DocumentCloudService};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::entities::{
|
||||
doc::{DocumentDelta, DocumentId},
|
||||
revision::{md5, RepeatedRevision, Revision},
|
||||
ws::ServerRevisionWSData,
|
||||
};
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_error::FlowyResult;
|
||||
use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket, WSStateReceiver};
|
||||
use lib_infra::future::FutureResult;
|
||||
use std::sync::Arc;
|
||||
use lib_ws::WSConnectState;
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait DocumentWSReceiver: Send + Sync {
|
||||
async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError>;
|
||||
fn connect_state_changed(&self, state: WSConnectState);
|
||||
}
|
||||
type WebSocketDataReceivers = Arc<DashMap<String, Arc<dyn DocumentWSReceiver>>>;
|
||||
pub struct DocumentController {
|
||||
cloud_service: Arc<dyn DocumentCloudService>,
|
||||
ws_receivers: Arc<DocumentWSReceivers>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
ws_receivers: WebSocketDataReceivers,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
open_cache: Arc<OpenDocCache>,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
}
|
||||
|
||||
impl DocumentController {
|
||||
pub(crate) fn new(
|
||||
pub fn new(
|
||||
cloud_service: Arc<dyn DocumentCloudService>,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
ws_receivers: Arc<DocumentWSReceivers>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
) -> Self {
|
||||
let ws_receivers = Arc::new(DashMap::new());
|
||||
let open_cache = Arc::new(OpenDocCache::new());
|
||||
Self {
|
||||
cloud_service,
|
||||
ws_receivers,
|
||||
ws_sender,
|
||||
web_socket,
|
||||
open_cache,
|
||||
user,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn init(&self) -> FlowyResult<()> {
|
||||
let notify = self.ws_sender.subscribe_state_changed();
|
||||
let notify = self.web_socket.subscribe_state_changed();
|
||||
listen_ws_state_changed(notify, self.ws_receivers.clone());
|
||||
|
||||
Ok(())
|
||||
@ -61,7 +64,7 @@ impl DocumentController {
|
||||
let doc_id = doc_id.as_ref();
|
||||
tracing::Span::current().record("doc_id", &doc_id);
|
||||
self.open_cache.remove(doc_id);
|
||||
self.ws_receivers.remove(doc_id);
|
||||
self.remove_ws_receiver(doc_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -70,7 +73,7 @@ impl DocumentController {
|
||||
let doc_id = doc_id.as_ref();
|
||||
tracing::Span::current().record("doc_id", &doc_id);
|
||||
self.open_cache.remove(doc_id);
|
||||
self.ws_receivers.remove(doc_id);
|
||||
self.remove_ws_receiver(doc_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -93,6 +96,25 @@ impl DocumentController {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn did_receive_ws_data(&self, data: Bytes) {
|
||||
let data: ServerRevisionWSData = data.try_into().unwrap();
|
||||
match self.ws_receivers.get(&data.object_id) {
|
||||
None => tracing::error!("Can't find any source handler for {:?}", data.object_id),
|
||||
Some(handler) => match handler.receive_ws_data(data).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("{}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ws_connect_state_changed(&self, state: &WSConnectState) {
|
||||
for receiver in self.ws_receivers.iter() {
|
||||
receiver.value().connect_state_changed(state.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DocumentController {
|
||||
async fn get_editor(&self, doc_id: &str) -> FlowyResult<Arc<ClientDocumentEditor>> {
|
||||
match self.open_cache.get(doc_id) {
|
||||
None => {
|
||||
@ -102,9 +124,7 @@ impl DocumentController {
|
||||
Some(editor) => Ok(editor),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DocumentController {
|
||||
async fn make_editor(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
@ -117,8 +137,8 @@ impl DocumentController {
|
||||
token,
|
||||
server: self.cloud_service.clone(),
|
||||
});
|
||||
let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?;
|
||||
self.ws_receivers.add(doc_id, doc_editor.ws_handler());
|
||||
let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.web_socket.clone(), server).await?;
|
||||
self.add_ws_receiver(doc_id, doc_editor.ws_handler());
|
||||
self.open_cache.insert(&doc_id, &doc_editor);
|
||||
Ok(doc_editor)
|
||||
}
|
||||
@ -128,6 +148,15 @@ impl DocumentController {
|
||||
let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool));
|
||||
Ok(RevisionManager::new(&user_id, doc_id, cache))
|
||||
}
|
||||
|
||||
fn add_ws_receiver(&self, object_id: &str, receiver: Arc<dyn DocumentWSReceiver>) {
|
||||
if self.ws_receivers.contains_key(object_id) {
|
||||
log::error!("Duplicate handler registered for {:?}", object_id);
|
||||
}
|
||||
self.ws_receivers.insert(object_id.to_string(), receiver);
|
||||
}
|
||||
|
||||
fn remove_ws_receiver(&self, id: &str) { self.ws_receivers.remove(id); }
|
||||
}
|
||||
|
||||
struct RevisionServerImpl {
|
||||
@ -194,10 +223,12 @@ impl OpenDocCache {
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(state_receiver, receivers))]
|
||||
fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc<DocumentWSReceivers>) {
|
||||
fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: WebSocketDataReceivers) {
|
||||
tokio::spawn(async move {
|
||||
while let Ok(state) = state_receiver.recv().await {
|
||||
receivers.ws_connect_state_changed(&state).await;
|
||||
for receiver in receivers.iter() {
|
||||
receiver.value().connect_state_changed(state.clone());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ use crate::{
|
||||
context::DocumentUser,
|
||||
core::{make_document_ws_manager, EditorCommand, EditorCommandQueue, EditorCommandSender},
|
||||
errors::FlowyError,
|
||||
ws_receivers::DocumentWSReceiver,
|
||||
DocumentWSReceiver,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::{
|
||||
@ -38,7 +38,7 @@ impl ClientDocumentEditor {
|
||||
doc_id: &str,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
mut rev_manager: RevisionManager,
|
||||
ws: Arc<dyn RevisionWebSocket>,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
server: Arc<dyn RevisionCloudService>,
|
||||
) -> FlowyResult<Arc<Self>> {
|
||||
let document_info = rev_manager.load::<DocumentInfoBuilder>(server).await?;
|
||||
@ -53,7 +53,7 @@ impl ClientDocumentEditor {
|
||||
user_id.clone(),
|
||||
edit_cmd_tx.clone(),
|
||||
rev_manager.clone(),
|
||||
ws,
|
||||
web_socket,
|
||||
)
|
||||
.await;
|
||||
let editor = Arc::new(Self {
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
core::{EditorCommand, TransformDeltas, SYNC_INTERVAL_IN_MILLIS},
|
||||
ws_receivers::DocumentWSReceiver,
|
||||
DocumentWSReceiver,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
@ -37,7 +37,7 @@ pub(crate) async fn make_document_ws_manager(
|
||||
user_id: String,
|
||||
edit_cmd_tx: EditorCommandSender,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
ws_conn: Arc<dyn RevisionWebSocket>,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
) -> Arc<RevisionWebSocketManager> {
|
||||
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
|
||||
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
|
||||
@ -50,7 +50,7 @@ pub(crate) async fn make_document_ws_manager(
|
||||
let ping_duration = Duration::from_millis(SYNC_INTERVAL_IN_MILLIS);
|
||||
let ws_manager = Arc::new(RevisionWebSocketManager::new(
|
||||
&doc_id,
|
||||
ws_conn,
|
||||
web_socket,
|
||||
data_provider,
|
||||
ws_stream_consumer,
|
||||
ping_duration,
|
||||
|
@ -1,10 +1,9 @@
|
||||
pub mod context;
|
||||
pub(crate) mod controller;
|
||||
mod controller;
|
||||
pub mod core;
|
||||
// mod notify;
|
||||
pub mod protobuf;
|
||||
pub mod ws_receivers;
|
||||
|
||||
pub use controller::*;
|
||||
pub mod errors {
|
||||
pub use flowy_error::{internal_error, ErrorCode, FlowyError};
|
||||
}
|
||||
|
@ -1,56 +0,0 @@
|
||||
use crate::errors::FlowyError;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::entities::ws::{ServerRevisionWSData};
|
||||
use lib_ws::WSConnectState;
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait DocumentWSReceiver: Send + Sync {
|
||||
async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError>;
|
||||
fn connect_state_changed(&self, state: WSConnectState);
|
||||
}
|
||||
|
||||
pub struct DocumentWSReceivers {
|
||||
// key: the document id
|
||||
// value: DocumentWSReceiver
|
||||
receivers: Arc<DashMap<String, Arc<dyn DocumentWSReceiver>>>,
|
||||
}
|
||||
|
||||
impl std::default::Default for DocumentWSReceivers {
|
||||
fn default() -> Self {
|
||||
let receivers: Arc<DashMap<String, Arc<dyn DocumentWSReceiver>>> = Arc::new(DashMap::new());
|
||||
DocumentWSReceivers { receivers }
|
||||
}
|
||||
}
|
||||
|
||||
impl DocumentWSReceivers {
|
||||
pub fn new() -> Self { DocumentWSReceivers::default() }
|
||||
|
||||
pub(crate) fn add(&self, doc_id: &str, receiver: Arc<dyn DocumentWSReceiver>) {
|
||||
if self.receivers.contains_key(doc_id) {
|
||||
log::error!("Duplicate handler registered for {:?}", doc_id);
|
||||
}
|
||||
self.receivers.insert(doc_id.to_string(), receiver);
|
||||
}
|
||||
|
||||
pub(crate) fn remove(&self, id: &str) { self.receivers.remove(id); }
|
||||
|
||||
pub async fn did_receive_data(&self, data: Bytes) {
|
||||
let data: ServerRevisionWSData = data.try_into().unwrap();
|
||||
match self.receivers.get(&data.object_id) {
|
||||
None => tracing::error!("Can't find any source handler for {:?}", data.object_id),
|
||||
Some(handler) => match handler.receive_ws_data(data).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("{}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ws_connect_state_changed(&self, state: &WSConnectState) {
|
||||
for receiver in self.receivers.iter() {
|
||||
receiver.value().connect_state_changed(state.clone());
|
||||
}
|
||||
}
|
||||
}
|
@ -12,42 +12,40 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
pub trait DocumentCloudStorage: Send + Sync {
|
||||
pub trait RevisionCloudStorage: Send + Sync {
|
||||
fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
|
||||
fn get_revisions(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError>;
|
||||
|
||||
fn reset_document(
|
||||
fn reset_object(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
repeated_revision: RepeatedRevisionPB,
|
||||
) -> BoxResultFuture<(), CollaborateError>;
|
||||
}
|
||||
|
||||
pub(crate) struct LocalDocumentCloudPersistence {
|
||||
pub(crate) struct LocalRevisionCloudPersistence {
|
||||
// For the moment, we use memory to cache the data, it will be implemented with other storage.
|
||||
// Like the Firestore,Dropbox.etc.
|
||||
storage: Arc<dyn DocumentCloudStorage>,
|
||||
storage: Arc<dyn RevisionCloudStorage>,
|
||||
}
|
||||
|
||||
impl Debug for LocalDocumentCloudPersistence {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") }
|
||||
impl Debug for LocalRevisionCloudPersistence {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalRevisionCloudPersistence") }
|
||||
}
|
||||
|
||||
impl std::default::Default for LocalDocumentCloudPersistence {
|
||||
impl std::default::Default for LocalRevisionCloudPersistence {
|
||||
fn default() -> Self {
|
||||
LocalDocumentCloudPersistence {
|
||||
LocalRevisionCloudPersistence {
|
||||
storage: Arc::new(MemoryDocumentCloudStorage::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
|
||||
fn enable_sync(&self) -> bool { false }
|
||||
|
||||
impl DocumentCloudPersistence for LocalRevisionCloudPersistence {
|
||||
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
|
||||
let storage = self.storage.clone();
|
||||
let doc_id = doc_id.to_owned();
|
||||
@ -107,7 +105,7 @@ impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
|
||||
let storage = self.storage.clone();
|
||||
let doc_id = doc_id.to_owned();
|
||||
Box::pin(async move {
|
||||
let _ = storage.reset_document(&doc_id, revisions).await?;
|
||||
let _ = storage.reset_object(&doc_id, revisions).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
@ -117,7 +115,7 @@ struct MemoryDocumentCloudStorage {}
|
||||
impl std::default::Default for MemoryDocumentCloudStorage {
|
||||
fn default() -> Self { Self {} }
|
||||
}
|
||||
impl DocumentCloudStorage for MemoryDocumentCloudStorage {
|
||||
impl RevisionCloudStorage for MemoryDocumentCloudStorage {
|
||||
fn set_revisions(&self, _repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
|
||||
Box::pin(async move { Ok(()) })
|
||||
}
|
||||
@ -133,7 +131,7 @@ impl DocumentCloudStorage for MemoryDocumentCloudStorage {
|
||||
})
|
||||
}
|
||||
|
||||
fn reset_document(
|
||||
fn reset_object(
|
||||
&self,
|
||||
_doc_id: &str,
|
||||
_repeated_revision: RepeatedRevisionPB,
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::local_server::persistence::LocalDocumentCloudPersistence;
|
||||
use crate::local_server::persistence::LocalRevisionCloudPersistence;
|
||||
use async_stream::stream;
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::{
|
||||
@ -35,7 +35,7 @@ impl LocalServer {
|
||||
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
|
||||
client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
|
||||
) -> Self {
|
||||
let persistence = Arc::new(LocalDocumentCloudPersistence::default());
|
||||
let persistence = Arc::new(LocalRevisionCloudPersistence::default());
|
||||
let doc_manager = Arc::new(ServerDocumentManager::new(persistence));
|
||||
let stop_tx = RwLock::new(None);
|
||||
|
||||
@ -122,7 +122,7 @@ impl LocalWebSocketRunner {
|
||||
client_data.ty,
|
||||
);
|
||||
let client_ws_sender = self.client_ws_sender.clone();
|
||||
let user = Arc::new(LocalDocumentUser {
|
||||
let user = Arc::new(LocalRevisionUser {
|
||||
user_id,
|
||||
client_ws_sender,
|
||||
});
|
||||
@ -144,15 +144,15 @@ impl LocalWebSocketRunner {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LocalDocumentUser {
|
||||
struct LocalRevisionUser {
|
||||
user_id: String,
|
||||
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
|
||||
}
|
||||
|
||||
impl RevisionUser for LocalDocumentUser {
|
||||
impl RevisionUser for LocalRevisionUser {
|
||||
fn user_id(&self) -> String { self.user_id.clone() }
|
||||
|
||||
fn receive(&self, resp: SyncResponse) {
|
||||
fn receive(&self, resp: RevisionSyncResponse) {
|
||||
let sender = self.client_ws_sender.clone();
|
||||
let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
|
||||
Ok(_) => {},
|
||||
@ -163,7 +163,7 @@ impl RevisionUser for LocalDocumentUser {
|
||||
|
||||
tokio::spawn(async move {
|
||||
match resp {
|
||||
SyncResponse::Pull(data) => {
|
||||
RevisionSyncResponse::Pull(data) => {
|
||||
let bytes: Bytes = data.try_into().unwrap();
|
||||
let msg = WebSocketRawMessage {
|
||||
module: WSModule::Doc,
|
||||
@ -171,7 +171,7 @@ impl RevisionUser for LocalDocumentUser {
|
||||
};
|
||||
send_fn(sender, msg);
|
||||
},
|
||||
SyncResponse::Push(data) => {
|
||||
RevisionSyncResponse::Push(data) => {
|
||||
let bytes: Bytes = data.try_into().unwrap();
|
||||
let msg = WebSocketRawMessage {
|
||||
module: WSModule::Doc,
|
||||
@ -179,7 +179,7 @@ impl RevisionUser for LocalDocumentUser {
|
||||
};
|
||||
send_fn(sender, msg);
|
||||
},
|
||||
SyncResponse::Ack(data) => {
|
||||
RevisionSyncResponse::Ack(data) => {
|
||||
let bytes: Bytes = data.try_into().unwrap();
|
||||
let msg = WebSocketRawMessage {
|
||||
module: WSModule::Doc,
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender};
|
||||
use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket};
|
||||
use dashmap::DashMap;
|
||||
use flowy_error::FlowyError;
|
||||
use lib_infra::future::FutureResult;
|
||||
@ -65,7 +65,7 @@ impl FlowyRawWebSocket for LocalWebSocket {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
|
||||
fn sender(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError> {
|
||||
let ws = LocalWebSocketAdaptor(self.server_ws_sender.clone());
|
||||
Ok(Arc::new(ws))
|
||||
}
|
||||
@ -74,7 +74,7 @@ impl FlowyRawWebSocket for LocalWebSocket {
|
||||
#[derive(Clone)]
|
||||
struct LocalWebSocketAdaptor(broadcast::Sender<WebSocketRawMessage>);
|
||||
|
||||
impl FlowyWSSender for LocalWebSocketAdaptor {
|
||||
impl FlowyWebSocket for LocalWebSocketAdaptor {
|
||||
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
|
||||
let _ = self.0.send(msg);
|
||||
Ok(())
|
||||
|
@ -16,10 +16,10 @@ pub trait FlowyRawWebSocket: Send + Sync {
|
||||
fn subscribe_connect_state(&self) -> broadcast::Receiver<WSConnectState>;
|
||||
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
|
||||
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError>;
|
||||
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError>;
|
||||
fn sender(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError>;
|
||||
}
|
||||
|
||||
pub trait FlowyWSSender: Send + Sync {
|
||||
pub trait FlowyWebSocket: Send + Sync {
|
||||
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError>;
|
||||
}
|
||||
|
||||
@ -101,12 +101,12 @@ impl FlowyWebSocketConnect {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { self.inner.sender() }
|
||||
pub fn web_socket(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError> { self.inner.sender() }
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(ws_conn))]
|
||||
pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
|
||||
let ws = ws_conn.inner.clone();
|
||||
let raw_web_socket = ws_conn.inner.clone();
|
||||
let mut notify = ws_conn.inner.subscribe_connect_state();
|
||||
let _ = tokio::spawn(async move {
|
||||
loop {
|
||||
@ -117,7 +117,7 @@ pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
|
||||
WSConnectState::Init => {},
|
||||
WSConnectState::Connected => {},
|
||||
WSConnectState::Connecting => {},
|
||||
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
|
||||
WSConnectState::Disconnected => retry_connect(raw_web_socket.clone(), 100).await,
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender};
|
||||
use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket};
|
||||
use flowy_error::internal_error;
|
||||
pub use flowy_error::FlowyError;
|
||||
use lib_infra::future::FutureResult;
|
||||
@ -42,13 +42,13 @@ impl FlowyRawWebSocket for Arc<WSController> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
|
||||
fn sender(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError> {
|
||||
let sender = self.ws_message_sender().map_err(internal_error)?;
|
||||
Ok(sender)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowyWSSender for WSSender {
|
||||
impl FlowyWebSocket for WSSender {
|
||||
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
|
||||
let _ = self.send_msg(msg).map_err(internal_error)?;
|
||||
Ok(())
|
||||
|
@ -1,13 +1,22 @@
|
||||
use backend_service::configuration::ClientServerConfiguration;
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::entities::ws::ClientRevisionWSData;
|
||||
use flowy_core::{
|
||||
errors::FlowyError,
|
||||
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
|
||||
controller::FolderManager,
|
||||
errors::{internal_error, FlowyError},
|
||||
module::{init_folder, WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
|
||||
};
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_net::{http_server::core::CoreHttpCloudService, local_server::LocalServer};
|
||||
use flowy_document::context::DocumentContext;
|
||||
use flowy_net::{
|
||||
http_server::core::CoreHttpCloudService,
|
||||
local_server::LocalServer,
|
||||
ws::connection::FlowyWebSocketConnect,
|
||||
};
|
||||
use flowy_sync::{RevisionWebSocket, WSStateReceiver};
|
||||
use flowy_user::services::UserSession;
|
||||
|
||||
use std::sync::Arc;
|
||||
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
pub struct CoreDepsResolver();
|
||||
impl CoreDepsResolver {
|
||||
@ -15,18 +24,22 @@ impl CoreDepsResolver {
|
||||
local_server: Option<Arc<LocalServer>>,
|
||||
user_session: Arc<UserSession>,
|
||||
server_config: &ClientServerConfiguration,
|
||||
) -> (
|
||||
Arc<dyn WorkspaceUser>,
|
||||
Arc<dyn WorkspaceDatabase>,
|
||||
Arc<dyn WorkspaceCloudService>,
|
||||
) {
|
||||
flowy_document: &Arc<DocumentContext>,
|
||||
ws_conn: Arc<FlowyWebSocketConnect>,
|
||||
) -> Arc<FolderManager> {
|
||||
let user: Arc<dyn WorkspaceUser> = Arc::new(WorkspaceUserImpl(user_session.clone()));
|
||||
let database: Arc<dyn WorkspaceDatabase> = Arc::new(WorkspaceDatabaseImpl(user_session));
|
||||
let ws_sender = Arc::new(FolderWebSocketImpl(ws_conn.clone()));
|
||||
let cloud_service: Arc<dyn WorkspaceCloudService> = match local_server {
|
||||
None => Arc::new(CoreHttpCloudService::new(server_config.clone())),
|
||||
Some(local_server) => local_server,
|
||||
};
|
||||
(user, database, cloud_service)
|
||||
|
||||
let folder_manager = init_folder(user, database, flowy_document.clone(), cloud_service, ws_sender);
|
||||
let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone()));
|
||||
ws_conn.add_ws_message_receiver(receiver).unwrap();
|
||||
|
||||
folder_manager
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,3 +56,30 @@ impl WorkspaceUser for WorkspaceUserImpl {
|
||||
|
||||
fn token(&self) -> Result<String, FlowyError> { self.0.token().map_err(|e| FlowyError::internal().context(e)) }
|
||||
}
|
||||
|
||||
struct FolderWebSocketImpl(Arc<FlowyWebSocketConnect>);
|
||||
impl RevisionWebSocket for FolderWebSocketImpl {
|
||||
fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> {
|
||||
let bytes: Bytes = data.try_into().unwrap();
|
||||
let msg = WebSocketRawMessage {
|
||||
module: WSModule::Folder,
|
||||
data: bytes.to_vec(),
|
||||
};
|
||||
let sender = self.0.web_socket()?;
|
||||
sender.send(msg).map_err(internal_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() }
|
||||
}
|
||||
|
||||
struct FolderWSMessageReceiverImpl(Arc<FolderManager>);
|
||||
impl WSMessageReceiver for FolderWSMessageReceiverImpl {
|
||||
fn source(&self) -> WSModule { WSModule::Folder }
|
||||
fn receive_message(&self, msg: WebSocketRawMessage) {
|
||||
let handler = self.0.clone();
|
||||
tokio::spawn(async move {
|
||||
handler.did_receive_ws_data(Bytes::from(msg.data)).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -3,10 +3,10 @@ use bytes::Bytes;
|
||||
use flowy_collaboration::entities::ws::ClientRevisionWSData;
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_document::{
|
||||
context::DocumentUser,
|
||||
context::{DocumentContext, DocumentUser},
|
||||
errors::{internal_error, FlowyError},
|
||||
ws_receivers::DocumentWSReceivers,
|
||||
DocumentCloudService,
|
||||
DocumentController,
|
||||
};
|
||||
use flowy_net::{
|
||||
http_server::document::DocumentHttpCloudService,
|
||||
@ -18,13 +18,6 @@ use flowy_user::services::UserSession;
|
||||
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
|
||||
use std::{convert::TryInto, path::Path, sync::Arc};
|
||||
|
||||
pub struct DocumentDependencies {
|
||||
pub user: Arc<dyn DocumentUser>,
|
||||
pub ws_receivers: Arc<DocumentWSReceivers>,
|
||||
pub ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
pub cloud_service: Arc<dyn DocumentCloudService>,
|
||||
}
|
||||
|
||||
pub struct DocumentDepsResolver();
|
||||
impl DocumentDepsResolver {
|
||||
pub fn resolve(
|
||||
@ -32,23 +25,21 @@ impl DocumentDepsResolver {
|
||||
ws_conn: Arc<FlowyWebSocketConnect>,
|
||||
user_session: Arc<UserSession>,
|
||||
server_config: &ClientServerConfiguration,
|
||||
) -> DocumentDependencies {
|
||||
) -> DocumentContext {
|
||||
let user = Arc::new(DocumentUserImpl(user_session));
|
||||
let ws_sender = Arc::new(DocumentWebSocketImpl(ws_conn.clone()));
|
||||
let ws_receivers = Arc::new(DocumentWSReceivers::new());
|
||||
let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone()));
|
||||
ws_conn.add_ws_message_receiver(receiver).unwrap();
|
||||
|
||||
let cloud_service: Arc<dyn DocumentCloudService> = match local_server {
|
||||
None => Arc::new(DocumentHttpCloudService::new(server_config.clone())),
|
||||
Some(local_server) => local_server,
|
||||
};
|
||||
|
||||
DocumentDependencies {
|
||||
let document_controller = Arc::new(DocumentController::new(cloud_service, user.clone(), ws_sender));
|
||||
let receiver = Arc::new(DocumentWSMessageReceiverImpl(document_controller.clone()));
|
||||
ws_conn.add_ws_message_receiver(receiver).unwrap();
|
||||
|
||||
DocumentContext {
|
||||
controller: document_controller,
|
||||
user,
|
||||
ws_receivers,
|
||||
ws_sender,
|
||||
cloud_service,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -80,7 +71,7 @@ impl RevisionWebSocket for DocumentWebSocketImpl {
|
||||
module: WSModule::Doc,
|
||||
data: bytes.to_vec(),
|
||||
};
|
||||
let sender = self.0.ws_sender()?;
|
||||
let sender = self.0.web_socket()?;
|
||||
sender.send(msg).map_err(internal_error)?;
|
||||
Ok(())
|
||||
}
|
||||
@ -88,13 +79,13 @@ impl RevisionWebSocket for DocumentWebSocketImpl {
|
||||
fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() }
|
||||
}
|
||||
|
||||
struct WSMessageReceiverImpl(Arc<DocumentWSReceivers>);
|
||||
impl WSMessageReceiver for WSMessageReceiverImpl {
|
||||
struct DocumentWSMessageReceiverImpl(Arc<DocumentController>);
|
||||
impl WSMessageReceiver for DocumentWSMessageReceiverImpl {
|
||||
fn source(&self) -> WSModule { WSModule::Doc }
|
||||
fn receive_message(&self, msg: WebSocketRawMessage) {
|
||||
let receivers = self.0.clone();
|
||||
let handler = self.0.clone();
|
||||
tokio::spawn(async move {
|
||||
receivers.did_receive_data(Bytes::from(msg.data)).await;
|
||||
handler.did_receive_ws_data(Bytes::from(msg.data)).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ mod deps_resolve;
|
||||
pub mod module;
|
||||
use crate::deps_resolve::*;
|
||||
use backend_service::configuration::ClientServerConfiguration;
|
||||
use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
|
||||
use flowy_core::{controller::FolderManager, errors::FlowyError};
|
||||
use flowy_document::context::DocumentContext;
|
||||
use flowy_net::{
|
||||
entities::NetworkType,
|
||||
@ -83,7 +83,7 @@ pub struct FlowySDK {
|
||||
config: FlowySDKConfig,
|
||||
pub user_session: Arc<UserSession>,
|
||||
pub document_ctx: Arc<DocumentContext>,
|
||||
pub core: Arc<CoreContext>,
|
||||
pub core: Arc<FolderManager>,
|
||||
pub dispatcher: Arc<EventDispatcher>,
|
||||
pub ws_conn: Arc<FlowyWebSocketConnect>,
|
||||
pub local_server: Option<Arc<LocalServer>>,
|
||||
@ -108,7 +108,13 @@ impl FlowySDK {
|
||||
|
||||
let user_session = mk_user_session(&config, &local_server, &config.server_config);
|
||||
let flowy_document = mk_document(&local_server, &ws_conn, &user_session, &config.server_config);
|
||||
let core_ctx = mk_core_context(&local_server, &user_session, &flowy_document, &config.server_config);
|
||||
let core_ctx = mk_core_context(
|
||||
&local_server,
|
||||
&user_session,
|
||||
&flowy_document,
|
||||
&config.server_config,
|
||||
&ws_conn,
|
||||
);
|
||||
|
||||
//
|
||||
let modules = mk_modules(&ws_conn, &core_ctx, &user_session);
|
||||
@ -134,12 +140,12 @@ fn _init(
|
||||
dispatch: &EventDispatcher,
|
||||
ws_conn: &Arc<FlowyWebSocketConnect>,
|
||||
user_session: &Arc<UserSession>,
|
||||
core: &Arc<CoreContext>,
|
||||
folder_manager: &Arc<FolderManager>,
|
||||
) {
|
||||
let subscribe_user_status = user_session.notifier.subscribe_user_status();
|
||||
let subscribe_network_type = ws_conn.subscribe_network_ty();
|
||||
let core = core.clone();
|
||||
let cloned_core = core.clone();
|
||||
let folder_manager = folder_manager.clone();
|
||||
let cloned_folder_manager = folder_manager.clone();
|
||||
let user_session = user_session.clone();
|
||||
let ws_conn = ws_conn.clone();
|
||||
let local_server = local_server.clone();
|
||||
@ -152,36 +158,36 @@ fn _init(
|
||||
user_session.init();
|
||||
ws_conn.init().await;
|
||||
listen_on_websocket(ws_conn.clone());
|
||||
_listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await;
|
||||
_listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await;
|
||||
});
|
||||
|
||||
dispatch.spawn(async move {
|
||||
_listen_network_status(subscribe_network_type, cloned_core).await;
|
||||
_listen_network_status(subscribe_network_type, cloned_folder_manager).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn _listen_user_status(
|
||||
ws_conn: Arc<FlowyWebSocketConnect>,
|
||||
mut subscribe: broadcast::Receiver<UserStatus>,
|
||||
core: Arc<CoreContext>,
|
||||
folder_manager: Arc<FolderManager>,
|
||||
) {
|
||||
while let Ok(status) = subscribe.recv().await {
|
||||
let result = || async {
|
||||
match status {
|
||||
UserStatus::Login { token, user_id } => {
|
||||
let _ = core.user_did_sign_in(&token).await?;
|
||||
let _ = folder_manager.user_did_sign_in(&token).await?;
|
||||
let _ = ws_conn.start(token, user_id).await?;
|
||||
},
|
||||
UserStatus::Logout { .. } => {
|
||||
core.user_did_logout().await;
|
||||
folder_manager.user_did_logout().await;
|
||||
let _ = ws_conn.stop().await;
|
||||
},
|
||||
UserStatus::Expired { .. } => {
|
||||
core.user_session_expired().await;
|
||||
folder_manager.user_session_expired().await;
|
||||
let _ = ws_conn.stop().await;
|
||||
},
|
||||
UserStatus::SignUp { profile, ret } => {
|
||||
let _ = core.user_did_sign_up(&profile.token).await?;
|
||||
let _ = folder_manager.user_did_sign_up(&profile.token).await?;
|
||||
let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
|
||||
let _ = ret.send(());
|
||||
},
|
||||
@ -196,7 +202,7 @@ async fn _listen_user_status(
|
||||
}
|
||||
}
|
||||
|
||||
async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<CoreContext>) {
|
||||
async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<FolderManager>) {
|
||||
while let Ok(_new_type) = subscribe.recv().await {
|
||||
// core.network_state_changed(new_type);
|
||||
}
|
||||
@ -235,10 +241,15 @@ fn mk_core_context(
|
||||
user_session: &Arc<UserSession>,
|
||||
flowy_document: &Arc<DocumentContext>,
|
||||
server_config: &ClientServerConfiguration,
|
||||
) -> Arc<CoreContext> {
|
||||
let (user, database, cloud_service) =
|
||||
CoreDepsResolver::resolve(local_server.clone(), user_session.clone(), server_config);
|
||||
init_core(user, database, flowy_document.clone(), cloud_service)
|
||||
ws_conn: &Arc<FlowyWebSocketConnect>,
|
||||
) -> Arc<FolderManager> {
|
||||
CoreDepsResolver::resolve(
|
||||
local_server.clone(),
|
||||
user_session.clone(),
|
||||
server_config,
|
||||
flowy_document,
|
||||
ws_conn.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn mk_document(
|
||||
@ -247,16 +258,10 @@ pub fn mk_document(
|
||||
user_session: &Arc<UserSession>,
|
||||
server_config: &ClientServerConfiguration,
|
||||
) -> Arc<DocumentContext> {
|
||||
let dependencies = DocumentDepsResolver::resolve(
|
||||
Arc::new(DocumentDepsResolver::resolve(
|
||||
local_server.clone(),
|
||||
ws_conn.clone(),
|
||||
user_session.clone(),
|
||||
server_config,
|
||||
);
|
||||
Arc::new(DocumentContext::new(
|
||||
dependencies.user,
|
||||
dependencies.ws_receivers,
|
||||
dependencies.ws_sender,
|
||||
dependencies.cloud_service,
|
||||
))
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use flowy_core::context::CoreContext;
|
||||
use flowy_core::controller::FolderManager;
|
||||
use flowy_net::ws::connection::FlowyWebSocketConnect;
|
||||
use flowy_user::services::UserSession;
|
||||
use lib_dispatch::prelude::Module;
|
||||
@ -6,17 +6,17 @@ use std::sync::Arc;
|
||||
|
||||
pub fn mk_modules(
|
||||
ws_conn: &Arc<FlowyWebSocketConnect>,
|
||||
core: &Arc<CoreContext>,
|
||||
folder_manager: &Arc<FolderManager>,
|
||||
user_session: &Arc<UserSession>,
|
||||
) -> Vec<Module> {
|
||||
let user_module = mk_user_module(user_session.clone());
|
||||
let core_module = mk_core_module(core.clone());
|
||||
let core_module = mk_core_module(folder_manager.clone());
|
||||
let network_module = mk_network_module(ws_conn.clone());
|
||||
vec![user_module, core_module, network_module]
|
||||
}
|
||||
|
||||
fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module::create(user_session) }
|
||||
|
||||
fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) }
|
||||
fn mk_core_module(core: Arc<FolderManager>) -> Module { flowy_core::module::create(core) }
|
||||
|
||||
fn mk_network_module(ws_conn: Arc<FlowyWebSocketConnect>) -> Module { flowy_net::module::create(ws_conn) }
|
||||
|
@ -1,5 +1,4 @@
|
||||
use crate::{RevisionCache, RevisionRecord};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::{
|
||||
entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
|
||||
@ -8,7 +7,6 @@ use flowy_collaboration::{
|
||||
use flowy_error::{FlowyError, FlowyResult};
|
||||
use futures_util::{future, stream, stream::StreamExt};
|
||||
use lib_infra::future::FutureResult;
|
||||
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
|
@ -43,7 +43,7 @@ pub struct RevisionWebSocketManager {
|
||||
pub object_id: String,
|
||||
data_provider: Arc<dyn RevisionWSSinkDataProvider>,
|
||||
stream_consumer: Arc<dyn RevisionWSSteamConsumer>,
|
||||
ws_conn: Arc<dyn RevisionWebSocket>,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
pub ws_passthrough_tx: Sender<ServerRevisionWSData>,
|
||||
ws_passthrough_rx: Option<Receiver<ServerRevisionWSData>>,
|
||||
pub state_passthrough_tx: broadcast::Sender<WSConnectState>,
|
||||
@ -53,7 +53,7 @@ pub struct RevisionWebSocketManager {
|
||||
impl RevisionWebSocketManager {
|
||||
pub fn new(
|
||||
object_id: &str,
|
||||
ws_conn: Arc<dyn RevisionWebSocket>,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
data_provider: Arc<dyn RevisionWSSinkDataProvider>,
|
||||
stream_consumer: Arc<dyn RevisionWSSteamConsumer>,
|
||||
ping_duration: Duration,
|
||||
@ -66,7 +66,7 @@ impl RevisionWebSocketManager {
|
||||
object_id,
|
||||
data_provider,
|
||||
stream_consumer,
|
||||
ws_conn,
|
||||
web_socket,
|
||||
ws_passthrough_tx,
|
||||
ws_passthrough_rx: Some(ws_passthrough_rx),
|
||||
state_passthrough_tx,
|
||||
@ -81,7 +81,7 @@ impl RevisionWebSocketManager {
|
||||
let sink = RevisionWSSink::new(
|
||||
&self.object_id,
|
||||
self.data_provider.clone(),
|
||||
self.ws_conn.clone(),
|
||||
self.web_socket.clone(),
|
||||
self.stop_sync_tx.subscribe(),
|
||||
ping_duration,
|
||||
);
|
||||
|
@ -2,7 +2,7 @@ use crate::{
|
||||
entities::{doc::DocumentInfo, ws::ServerRevisionWSDataBuilder},
|
||||
errors::{internal_error, CollaborateError, CollaborateResult},
|
||||
protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
|
||||
server_document::{document_pad::ServerDocument, RevisionSynchronizer, RevisionUser, SyncResponse},
|
||||
server_document::{document_pad::ServerDocument, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
|
||||
};
|
||||
use async_stream::stream;
|
||||
use dashmap::DashMap;
|
||||
@ -16,8 +16,6 @@ use tokio::{
|
||||
};
|
||||
|
||||
pub trait DocumentCloudPersistence: Send + Sync + Debug {
|
||||
fn enable_sync(&self) -> bool;
|
||||
|
||||
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
|
||||
|
||||
fn create_document(
|
||||
@ -78,9 +76,9 @@ impl ServerDocumentManager {
|
||||
};
|
||||
|
||||
if result.is_ok() {
|
||||
cloned_user.receive(SyncResponse::Ack(ServerRevisionWSDataBuilder::build_ack_message(
|
||||
&object_id, ack_id,
|
||||
)));
|
||||
cloned_user.receive(RevisionSyncResponse::Ack(
|
||||
ServerRevisionWSDataBuilder::build_ack_message(&object_id, ack_id),
|
||||
));
|
||||
}
|
||||
result
|
||||
}
|
||||
|
@ -22,10 +22,10 @@ use std::{
|
||||
|
||||
pub trait RevisionUser: Send + Sync + Debug {
|
||||
fn user_id(&self) -> String;
|
||||
fn receive(&self, resp: SyncResponse);
|
||||
fn receive(&self, resp: RevisionSyncResponse);
|
||||
}
|
||||
|
||||
pub enum SyncResponse {
|
||||
pub enum RevisionSyncResponse {
|
||||
Pull(ServerRevisionWSData),
|
||||
Push(ServerRevisionWSData),
|
||||
Ack(ServerRevisionWSData),
|
||||
@ -66,7 +66,7 @@ impl RevisionSynchronizer {
|
||||
let revisions = self.persistence.read_revisions(&doc_id, None).await?;
|
||||
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?;
|
||||
let data = ServerRevisionWSDataBuilder::build_push_message(&doc_id, repeated_revision);
|
||||
user.receive(SyncResponse::Push(data));
|
||||
user.receive(RevisionSyncResponse::Push(data));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@ -94,7 +94,7 @@ impl RevisionSynchronizer {
|
||||
end: first_revision.rev_id,
|
||||
};
|
||||
let msg = ServerRevisionWSDataBuilder::build_pull_message(&self.doc_id, range);
|
||||
user.receive(SyncResponse::Pull(msg));
|
||||
user.receive(RevisionSyncResponse::Pull(msg));
|
||||
}
|
||||
},
|
||||
Ordering::Equal => {
|
||||
@ -221,7 +221,7 @@ impl RevisionSynchronizer {
|
||||
match repeated_revision_from_revision_pbs(revisions) {
|
||||
Ok(repeated_revision) => {
|
||||
let data = ServerRevisionWSDataBuilder::build_push_message(&self.doc_id, repeated_revision);
|
||||
user.receive(SyncResponse::Push(data));
|
||||
user.receive(RevisionSyncResponse::Push(data));
|
||||
},
|
||||
Err(e) => tracing::error!("{}", e),
|
||||
}
|
||||
|
@ -14,7 +14,8 @@ pub struct WebSocketRawMessage {
|
||||
|
||||
#[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)]
|
||||
pub enum WSModule {
|
||||
Doc = 0,
|
||||
Doc = 0,
|
||||
Folder = 1,
|
||||
}
|
||||
|
||||
impl std::default::Default for WSModule {
|
||||
@ -25,6 +26,7 @@ impl ToString for WSModule {
|
||||
fn to_string(&self) -> String {
|
||||
match self {
|
||||
WSModule::Doc => "0".to_string(),
|
||||
WSModule::Folder => "1".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -216,6 +216,7 @@ impl ::protobuf::reflect::ProtobufValue for WebSocketRawMessage {
|
||||
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
|
||||
pub enum WSModule {
|
||||
Doc = 0,
|
||||
Folder = 1,
|
||||
}
|
||||
|
||||
impl ::protobuf::ProtobufEnum for WSModule {
|
||||
@ -226,6 +227,7 @@ impl ::protobuf::ProtobufEnum for WSModule {
|
||||
fn from_i32(value: i32) -> ::std::option::Option<WSModule> {
|
||||
match value {
|
||||
0 => ::std::option::Option::Some(WSModule::Doc),
|
||||
1 => ::std::option::Option::Some(WSModule::Folder),
|
||||
_ => ::std::option::Option::None
|
||||
}
|
||||
}
|
||||
@ -233,6 +235,7 @@ impl ::protobuf::ProtobufEnum for WSModule {
|
||||
fn values() -> &'static [Self] {
|
||||
static values: &'static [WSModule] = &[
|
||||
WSModule::Doc,
|
||||
WSModule::Folder,
|
||||
];
|
||||
values
|
||||
}
|
||||
@ -263,18 +266,20 @@ impl ::protobuf::reflect::ProtobufValue for WSModule {
|
||||
static file_descriptor_proto_data: &'static [u8] = b"\
|
||||
\n\tmsg.proto\"L\n\x13WebSocketRawMessage\x12!\n\x06module\x18\x01\x20\
|
||||
\x01(\x0e2\t.WSModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\
|
||||
\x04data*\x13\n\x08WSModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\
|
||||
\x04\0\0\x08\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\
|
||||
\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x1b\n\x0b\n\x04\x04\0\
|
||||
\x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\
|
||||
\x0c\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\r\x13\n\x0c\n\x05\x04\0\x02\
|
||||
\0\x03\x12\x03\x03\x16\x17\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\
|
||||
\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\
|
||||
\x01\x01\x12\x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\
|
||||
\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x08\x01\n\n\n\x03\x05\0\x01\x12\x03\
|
||||
\x06\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x0c\n\x0c\n\x05\x05\0\
|
||||
\x02\0\x01\x12\x03\x07\x04\x07\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\n\
|
||||
\x0bb\x06proto3\
|
||||
\x04data*\x1f\n\x08WSModule\x12\x07\n\x03Doc\x10\0\x12\n\n\x06Folder\x10\
|
||||
\x01J\x82\x02\n\x06\x12\x04\0\0\t\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\
|
||||
\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\
|
||||
\x1b\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\
|
||||
\x06\x12\x03\x03\x04\x0c\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\r\x13\n\
|
||||
\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x16\x17\n\x0b\n\x04\x04\0\x02\x01\
|
||||
\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\
|
||||
\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\
|
||||
\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\t\x01\n\n\n\x03\
|
||||
\x05\0\x01\x12\x03\x06\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x0c\
|
||||
\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x07\n\x0c\n\x05\x05\0\x02\0\
|
||||
\x02\x12\x03\x07\n\x0b\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x0f\n\
|
||||
\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\n\n\x0c\n\x05\x05\0\x02\x01\
|
||||
\x02\x12\x03\x08\r\x0eb\x06proto3\
|
||||
";
|
||||
|
||||
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
|
||||
|
@ -6,4 +6,5 @@ message WebSocketRawMessage {
|
||||
}
|
||||
enum WSModule {
|
||||
Doc = 0;
|
||||
Folder = 1;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user