enable local ws

This commit is contained in:
appflowy 2022-01-07 17:37:11 +08:00
parent f66ea6d224
commit 287698be9e
49 changed files with 525 additions and 546 deletions

18
backend/Cargo.lock generated
View File

@ -1387,6 +1387,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"dashmap",
"flowy-collaboration",
"flowy-derive",
"flowy-error",
"lib-dispatch",
@ -1413,7 +1415,6 @@ dependencies = [
"flowy-document",
"flowy-net",
"flowy-user",
"flowy-virtual-net",
"futures-core",
"lib-dispatch",
"lib-infra",
@ -1504,21 +1505,6 @@ dependencies = [
"validator",
]
[[package]]
name = "flowy-virtual-net"
version = "0.1.0"
dependencies = [
"bytes",
"dashmap",
"flowy-collaboration",
"flowy-net",
"lib-infra",
"lib-ws",
"parking_lot",
"tokio",
"tracing",
]
[[package]]
name = "fnv"
version = "1.0.7"

View File

@ -17,11 +17,9 @@ use backend::services::document::persistence::{read_document, reset_document};
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision};
use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, DocumentId as DocumentIdPB};
use flowy_collaboration::sync::ServerDocumentManager;
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use lib_ot::core::Interval;
use flowy_net::services::ws::FlowyWSConnect;
pub struct DocumentTest {
server: TestServer,
flowy_test: FlowySDKTest,
@ -39,7 +37,7 @@ pub enum DocScript {
impl DocumentTest {
pub async fn new() -> Self {
let server = spawn_server().await;
let flowy_test = FlowySDKTest::setup_with(server.client_server_config.clone());
let flowy_test = FlowySDKTest::new(server.client_server_config.clone(), None);
Self { server, flowy_test }
}
@ -57,7 +55,7 @@ struct ScriptContext {
client_editor: Option<Arc<ClientDocumentEditor>>,
client_sdk: FlowySDKTest,
client_user_session: Arc<UserSession>,
ws_conn: Arc<FlowyWSConnect>,
ws_conn: Arc<FlowyWebSocketConnect>,
server: TestServer,
doc_id: String,
}
@ -65,7 +63,7 @@ struct ScriptContext {
impl ScriptContext {
async fn new(client_sdk: FlowySDKTest, server: TestServer) -> Self {
let user_session = client_sdk.user_session.clone();
let ws_manager = client_sdk.ws_manager.clone();
let ws_manager = client_sdk.ws_conn.clone();
let doc_id = create_doc(&client_sdk).await;
Self {
@ -80,7 +78,7 @@ impl ScriptContext {
async fn open_doc(&mut self) {
let doc_id = self.doc_id.clone();
let edit_context = self.client_sdk.document_ctx.controller.open(doc_id).await.unwrap();
let edit_context = self.client_sdk.document_ctx.controller.open_document(doc_id).await.unwrap();
self.client_editor = Some(edit_context);
}

View File

@ -22,4 +22,5 @@
"files.associations": {
"*.log.*": "log"
},
"editor.formatOnSave": true,
}

View File

@ -4,7 +4,6 @@ members = [
"lib-log",
"lib-sqlite",
"flowy-net",
"flowy-virtual-net",
"flowy-sdk",
"dart-ffi",
"flowy-user",

View File

@ -26,7 +26,7 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
let path: &str = c_str.to_str().unwrap();
let server_config = get_client_server_configuration().unwrap();
let config = FlowySDKConfig::new(path, server_config, "appflowy").log_filter("debug");
let config = FlowySDKConfig::new(path, server_config, "appflowy", None).log_filter("debug");
*FLOWY_SDK.write() = Some(Arc::new(FlowySDK::new(config)));
0

View File

@ -119,7 +119,7 @@ impl CoreContext {
.payload(repeated_workspace)
.send();
log::debug!("workspace initialize after sign up");
tracing::debug!("Create default workspace after sign up");
let _ = self.init(&token).await?;
Ok(())
}
@ -130,13 +130,13 @@ impl CoreContext {
return Ok(());
}
}
log::debug!("Start initializing flowy core");
tracing::debug!("Start initializing flowy core");
INIT_WORKSPACE.write().insert(token.to_owned(), true);
let _ = self.workspace_controller.init()?;
let _ = self.app_controller.init()?;
let _ = self.view_controller.init()?;
let _ = self.trash_controller.init()?;
log::debug!("Finish initializing core");
tracing::debug!("Finish initializing core");
Ok(())
}

View File

@ -129,7 +129,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> {
let doc_id = params.doc_id.clone();
let editor = self.document_ctx.controller.open(&params.doc_id).await?;
let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
KV::set_str(LATEST_VIEW_ID, doc_id.clone());
let document_json = editor.document_json().await?;
@ -141,7 +141,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> {
let _ = self.document_ctx.controller.close(&params.doc_id)?;
let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
Ok(())
}
@ -152,14 +152,14 @@ impl ViewController {
let _ = KV::remove(LATEST_VIEW_ID);
}
}
let _ = self.document_ctx.controller.close(&params.doc_id)?;
let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
let editor = self.document_ctx.controller.open(&params.doc_id).await?;
let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
let document_json = editor.document_json().await?;
let duplicate_params = CreateViewParams {
belong_to_id: view.belong_to_id.clone(),
@ -177,7 +177,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), err)]
pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> {
let editor = self.document_ctx.controller.open(&params.doc_id).await?;
let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
let delta_json = editor.document_json().await?;
Ok(ExportData {
data: delta_json,

View File

@ -8,7 +8,7 @@ use flowy_test::{helper::*, FlowySDKTest};
#[tokio::test]
#[should_panic]
async fn view_delete() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let _ = test.init_user().await;
let test = ViewTest::new(&test).await;
@ -21,7 +21,7 @@ async fn view_delete() {
#[tokio::test]
async fn view_delete_then_putback() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let _ = test.init_user().await;
let test = ViewTest::new(&test).await;
@ -44,7 +44,7 @@ async fn view_delete_then_putback() {
#[tokio::test]
async fn view_delete_all() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let _ = test.init_user().await;
let test = ViewTest::new(&test).await;
@ -66,7 +66,7 @@ async fn view_delete_all() {
#[tokio::test]
async fn view_delete_all_permanent() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let _ = test.init_user().await;
let test = ViewTest::new(&test).await;
@ -85,7 +85,7 @@ async fn view_delete_all_permanent() {
#[tokio::test]
async fn view_open_doc() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let _ = test.init_user().await;
let test = ViewTest::new(&test).await;

View File

@ -42,7 +42,7 @@ async fn workspace_create_with_apps() {
#[tokio::test]
async fn workspace_create_with_invalid_name() {
for (name, code) in invalid_workspace_name_test_case() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let request = CreateWorkspaceRequest {
name,
desc: "".to_owned(),
@ -62,7 +62,7 @@ async fn workspace_create_with_invalid_name() {
#[tokio::test]
async fn workspace_update_with_invalid_name() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
for (name, code) in invalid_workspace_name_test_case() {
let request = CreateWorkspaceRequest {
name,

View File

@ -2,7 +2,7 @@ use crate::{
context::DocumentUser,
core::{
edit::ClientDocumentEditor,
revision::{RevisionCache, RevisionManager, RevisionServer},
revision::{DocumentRevisionCache, DocumentRevisionManager, RevisionServer},
DocumentWSReceivers,
DocumentWebSocket,
WSStateReceiver,
@ -54,14 +54,14 @@ impl DocumentController {
}
#[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
pub async fn open<T: AsRef<str>>(&self, doc_id: T) -> Result<Arc<ClientDocumentEditor>, FlowyError> {
pub async fn open_document<T: AsRef<str>>(&self, doc_id: T) -> Result<Arc<ClientDocumentEditor>, FlowyError> {
let doc_id = doc_id.as_ref();
tracing::Span::current().record("doc_id", &doc_id);
self.get_editor(doc_id).await
}
#[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
pub fn close<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
pub fn close_document<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
let doc_id = doc_id.as_ref();
tracing::Span::current().record("doc_id", &doc_id);
self.open_cache.remove(doc_id);
@ -127,10 +127,10 @@ impl DocumentController {
Ok(doc_editor)
}
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<DocumentRevisionManager, FlowyError> {
let user_id = self.user.user_id()?;
let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool));
Ok(RevisionManager::new(&user_id, doc_id, cache))
let cache = Arc::new(DocumentRevisionCache::new(&user_id, doc_id, pool));
Ok(DocumentRevisionManager::new(&user_id, doc_id, cache))
}
}

View File

@ -18,7 +18,7 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
pub struct ClientDocumentEditor {
pub doc_id: String,
rev_manager: Arc<RevisionManager>,
rev_manager: Arc<DocumentRevisionManager>,
ws_manager: Arc<dyn DocumentWebSocketManager>,
edit_queue: UnboundedSender<EditorCommand>,
}
@ -27,7 +27,7 @@ impl ClientDocumentEditor {
pub(crate) async fn new(
doc_id: &str,
user: Arc<dyn DocumentUser>,
mut rev_manager: RevisionManager,
mut rev_manager: DocumentRevisionManager,
ws: Arc<dyn DocumentWebSocket>,
server: Arc<dyn RevisionServer>,
) -> FlowyResult<Arc<Self>> {
@ -157,7 +157,7 @@ impl ClientDocumentEditor {
fn spawn_edit_queue(
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
rev_manager: Arc<DocumentRevisionManager>,
delta: RichTextDelta,
) -> UnboundedSender<EditorCommand> {
let (sender, receiver) = mpsc::unbounded_channel::<EditorCommand>();
@ -184,5 +184,5 @@ impl ClientDocumentEditor {
Ok(delta)
}
pub fn rev_manager(&self) -> Arc<RevisionManager> { self.rev_manager.clone() }
pub fn rev_manager(&self) -> Arc<DocumentRevisionManager> { self.rev_manager.clone() }
}

View File

@ -1,4 +1,4 @@
use crate::{context::DocumentUser, core::RevisionManager};
use crate::{context::DocumentUser, core::DocumentRevisionManager};
use async_stream::stream;
use flowy_collaboration::{
document::{history::UndoResult, Document, NewlineDoc},
@ -18,14 +18,14 @@ use tokio::sync::{mpsc, oneshot, RwLock};
pub(crate) struct EditorCommandQueue {
document: Arc<RwLock<Document>>,
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
rev_manager: Arc<DocumentRevisionManager>,
receiver: Option<mpsc::UnboundedReceiver<EditorCommand>>,
}
impl EditorCommandQueue {
pub(crate) fn new(
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
rev_manager: Arc<DocumentRevisionManager>,
delta: RichTextDelta,
receiver: mpsc::UnboundedReceiver<EditorCommand>,
) -> Self {

View File

@ -1,7 +1,7 @@
use crate::{
core::revision::{
disk::{DocumentRevisionDiskCache, RevisionChangeset, RevisionTableState, SQLitePersistence},
memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
memory::{DocumentRevisionMemoryCache, RevisionMemoryCacheDelegate},
},
errors::FlowyError,
};
@ -17,17 +17,17 @@ use std::{
};
use tokio::task::spawn_blocking;
pub struct RevisionCache {
pub struct DocumentRevisionCache {
doc_id: String,
disk_cache: Arc<dyn DocumentRevisionDiskCache<Error = FlowyError>>,
memory_cache: Arc<RevisionMemoryCache>,
memory_cache: Arc<DocumentRevisionMemoryCache>,
latest_rev_id: AtomicI64,
}
impl RevisionCache {
pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
impl DocumentRevisionCache {
pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> DocumentRevisionCache {
let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool));
let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
let memory_cache = Arc::new(DocumentRevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
let doc_id = doc_id.to_owned();
Self {
doc_id,
@ -44,7 +44,7 @@ impl RevisionCache {
write_to_disk: bool,
) -> FlowyResult<RevisionRecord> {
if self.memory_cache.contains(&revision.rev_id) {
return Err(FlowyError::internal().context(format!("Duplicate remote revision id: {}", revision.rev_id)));
return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state)));
}
let state = state.as_ref().clone();
let rev_id = revision.rev_id;
@ -53,6 +53,7 @@ impl RevisionCache {
state,
write_to_disk,
};
self.memory_cache.add(Cow::Borrowed(&record)).await;
self.set_latest_rev_id(rev_id);
Ok(record)
@ -131,10 +132,15 @@ impl RevisionCache {
}
impl RevisionMemoryCacheDelegate for Arc<SQLitePersistence> {
#[tracing::instrument(level = "debug", skip(self, records), fields(checkpoint_result), err)]
fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
records.retain(|record| record.write_to_disk);
if !records.is_empty() {
tracing::Span::current().record(
"checkpoint_result",
&format!("{} records were saved", records.len()).as_str(),
);
let _ = self.write_revision_records(records, &conn)?;
}
Ok(())

View File

@ -1,5 +1,5 @@
use crate::{
core::{revision::RevisionCache, RevisionRecord},
core::{revision::DocumentRevisionCache, RevisionRecord},
errors::FlowyError,
};
use bytes::Bytes;
@ -22,16 +22,16 @@ pub trait RevisionServer: Send + Sync {
fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
}
pub struct RevisionManager {
pub struct DocumentRevisionManager {
pub(crate) doc_id: String,
user_id: String,
rev_id_counter: RevIdCounter,
cache: Arc<RevisionCache>,
cache: Arc<DocumentRevisionCache>,
sync_seq: Arc<RevisionSyncSequence>,
}
impl RevisionManager {
pub fn new(user_id: &str, doc_id: &str, cache: Arc<RevisionCache>) -> Self {
impl DocumentRevisionManager {
pub fn new(user_id: &str, doc_id: &str, cache: Arc<DocumentRevisionCache>) -> Self {
let rev_id_counter = RevIdCounter::new(0);
let sync_seq = Arc::new(RevisionSyncSequence::new());
Self {
@ -70,8 +70,8 @@ impl RevisionManager {
if revision.delta_data.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty"));
}
self.rev_id_counter.set(revision.rev_id);
let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
self.rev_id_counter.set(revision.rev_id);
Ok(())
}
@ -194,7 +194,7 @@ struct RevisionLoader {
doc_id: String,
user_id: String,
server: Arc<dyn RevisionServer>,
cache: Arc<RevisionCache>,
cache: Arc<DocumentRevisionCache>,
}
impl RevisionLoader {
@ -272,6 +272,6 @@ impl RevisionSyncSequence {
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionManager {
pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
impl DocumentRevisionManager {
pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> { self.cache.clone() }
}

View File

@ -2,7 +2,6 @@ use crate::core::RevisionRecord;
use dashmap::DashMap;
use flowy_collaboration::entities::revision::RevisionRange;
use flowy_error::{FlowyError, FlowyResult};
use futures_util::{stream, stream::StreamExt};
use std::{borrow::Cow, sync::Arc, time::Duration};
use tokio::{sync::RwLock, task::JoinHandle};
@ -11,7 +10,7 @@ pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync {
fn receive_ack(&self, doc_id: &str, rev_id: i64);
}
pub(crate) struct RevisionMemoryCache {
pub(crate) struct DocumentRevisionMemoryCache {
doc_id: String,
revs_map: Arc<DashMap<i64, RevisionRecord>>,
delegate: Arc<dyn RevisionMemoryCacheDelegate>,
@ -19,9 +18,9 @@ pub(crate) struct RevisionMemoryCache {
defer_save: RwLock<Option<JoinHandle<()>>>,
}
impl RevisionMemoryCache {
impl DocumentRevisionMemoryCache {
pub(crate) fn new(doc_id: &str, delegate: Arc<dyn RevisionMemoryCacheDelegate>) -> Self {
RevisionMemoryCache {
DocumentRevisionMemoryCache {
doc_id: doc_id.to_owned(),
revs_map: Arc::new(DashMap::new()),
delegate,
@ -38,15 +37,19 @@ impl RevisionMemoryCache {
Cow::Owned(record) => record,
};
let rev_id = record.revision.rev_id;
if self.revs_map.contains_key(&rev_id) {
return;
}
if let Some(rev_id) = self.pending_write_revs.read().await.last() {
if *rev_id >= record.revision.rev_id {
tracing::error!("Duplicated revision added to memory_cache");
return;
}
}
// TODO: Remove outdated revisions to reduce memory usage
self.revs_map.insert(record.revision.rev_id, record.clone());
self.pending_write_revs.write().await.push(record.revision.rev_id);
self.revs_map.insert(rev_id, record);
self.pending_write_revs.write().await.push(rev_id);
self.make_checkpoint().await;
}
@ -79,16 +82,19 @@ impl RevisionMemoryCache {
pub(crate) async fn reset_with_revisions(&self, revision_records: &[RevisionRecord]) -> FlowyResult<()> {
self.revs_map.clear();
self.pending_write_revs.write().await.clear();
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
stream::iter(revision_records)
.for_each(|record| async move {
self.add(Cow::Borrowed(record)).await;
})
.await;
let mut write_guard = self.pending_write_revs.write().await;
write_guard.clear();
for record in revision_records {
self.revs_map.insert(record.revision.rev_id, record.clone());
write_guard.push(record.revision.rev_id);
}
drop(write_guard);
self.make_checkpoint().await;
Ok(())
}
@ -107,9 +113,8 @@ impl RevisionMemoryCache {
let delegate = self.delegate.clone();
*self.defer_save.write().await = Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
tokio::time::sleep(Duration::from_millis(600)).await;
let mut revs_write_guard = pending_write_revs.write().await;
// TODO:
// It may cause performance issues because we hold the write lock of the
// rev_order and the lock will be released after the checkpoint has been written
// to the disk.

View File

@ -1,5 +1,5 @@
use crate::{
core::{web_socket::web_socket::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS},
core::{web_socket::ws_manager::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS},
ws_receivers::{DocumentWSReceiver, DocumentWebSocket},
};
use async_stream::stream;
@ -27,7 +27,7 @@ pub(crate) struct HttpWebSocketManager {
doc_id: String,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
ws: Arc<dyn DocumentWebSocket>,
ws_conn: Arc<dyn DocumentWebSocket>,
ws_msg_tx: UnboundedSender<DocumentServerWSData>,
ws_msg_rx: Option<UnboundedReceiver<DocumentServerWSData>>,
stop_sync_tx: SinkStopTx,
@ -37,7 +37,7 @@ pub(crate) struct HttpWebSocketManager {
impl HttpWebSocketManager {
pub(crate) fn new(
doc_id: &str,
ws: Arc<dyn DocumentWebSocket>,
ws_conn: Arc<dyn DocumentWebSocket>,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
) -> Self {
@ -49,7 +49,7 @@ impl HttpWebSocketManager {
doc_id,
data_provider,
stream_consumer,
ws,
ws_conn,
ws_msg_tx,
ws_msg_rx: Some(ws_msg_rx),
stop_sync_tx,
@ -64,7 +64,7 @@ impl HttpWebSocketManager {
let sink = DocumentWSSink::new(
&self.doc_id,
self.data_provider.clone(),
self.ws.clone(),
self.ws_conn.clone(),
self.stop_sync_tx.subscribe(),
);
let stream = DocumentWSStream::new(
@ -200,7 +200,6 @@ impl DocumentWSStream {
// Notify the user that someone has connected to this document
},
}
Ok(())
}
}
@ -260,7 +259,7 @@ impl DocumentWSSink {
.for_each(|_| async {
match self.send_next_revision().await {
Ok(_) => {},
Err(e) => log::error!("[DocumentSink]: send msg failed, {:?}", e),
Err(e) => log::error!("[DocumentSink]: Send failed, {:?}", e),
}
})
.await;
@ -273,6 +272,7 @@ impl DocumentWSSink {
Ok(())
},
Some(data) => {
tracing::debug!("[DocumentSink]: Try send: {}:{:?}-{}", data.doc_id, data.ty, data.id());
self.ws_sender.send(data).map_err(internal_error)
// let _ = tokio::time::timeout(Duration::from_millis(2000),
},

View File

@ -1,7 +1,7 @@
#![allow(clippy::module_inception)]
mod http_ws_impl;
mod local_ws_impl;
mod web_socket;
mod ws_manager;
pub(crate) use http_ws_impl::*;
pub(crate) use web_socket::*;
pub(crate) use ws_manager::*;

View File

@ -1,9 +1,9 @@
use crate::core::{
web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager},
DocumentRevisionManager,
DocumentWSReceiver,
DocumentWebSocket,
EditorCommand,
RevisionManager,
TransformDeltas,
};
use bytes::Bytes;
@ -17,7 +17,6 @@ use flowy_collaboration::{
use flowy_error::{internal_error, FlowyError, FlowyResult};
use lib_infra::future::FutureResult;
use crate::core::web_socket::local_ws_impl::LocalWebSocketManager;
use flowy_collaboration::entities::ws::DocumentServerWSDataType;
use lib_ws::WSConnectState;
@ -33,10 +32,31 @@ pub(crate) async fn make_document_ws_manager(
doc_id: String,
user_id: String,
edit_cmd_tx: UnboundedSender<EditorCommand>,
rev_manager: Arc<RevisionManager>,
ws: Arc<dyn DocumentWebSocket>,
rev_manager: Arc<DocumentRevisionManager>,
ws_conn: Arc<dyn DocumentWebSocket>,
) -> Arc<dyn DocumentWebSocketManager> {
if cfg!(feature = "http_server") {
// if cfg!(feature = "http_server") {
// let shared_sink =
// Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
// let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
// doc_id: doc_id.clone(),
// edit_cmd_tx,
// rev_manager: rev_manager.clone(),
// shared_sink: shared_sink.clone(),
// });
// let data_provider =
// Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
// let ws_manager = Arc::new(HttpWebSocketManager::new(
// &doc_id,
// ws_conn,
// data_provider,
// ws_stream_consumer,
// ));
// listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(),
// rev_manager); Arc::new(ws_manager)
// } else {
// Arc::new(Arc::new(LocalWebSocketManager {}))
// }
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(),
@ -44,25 +64,22 @@ pub(crate) async fn make_document_ws_manager(
rev_manager: rev_manager.clone(),
shared_sink: shared_sink.clone(),
});
let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink);
let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
let ws_manager = Arc::new(HttpWebSocketManager::new(
&doc_id,
ws.clone(),
Arc::new(ws_stream_provider),
ws_conn,
data_provider,
ws_stream_consumer,
));
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager);
Arc::new(ws_manager)
} else {
Arc::new(Arc::new(LocalWebSocketManager {}))
}
}
fn listen_document_ws_state(
_user_id: &str,
_doc_id: &str,
mut subscriber: broadcast::Receiver<WSConnectState>,
_rev_manager: Arc<RevisionManager>,
_rev_manager: Arc<DocumentRevisionManager>,
) {
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
@ -79,7 +96,7 @@ fn listen_document_ws_state(
pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
pub(crate) doc_id: String,
pub(crate) edit_cmd_tx: UnboundedSender<EditorCommand>,
pub(crate) rev_manager: Arc<RevisionManager>,
pub(crate) rev_manager: Arc<DocumentRevisionManager>,
pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
}
@ -141,7 +158,7 @@ async fn transform_pushed_revisions(
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
pub(crate) async fn handle_remote_revision(
edit_cmd_tx: UnboundedSender<EditorCommand>,
rev_manager: Arc<RevisionManager>,
rev_manager: Arc<DocumentRevisionManager>,
bytes: Bytes,
) -> FlowyResult<Option<Revision>> {
let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
@ -202,12 +219,12 @@ enum SourceType {
#[derive(Clone)]
pub(crate) struct SharedWSSinkDataProvider {
shared: Arc<RwLock<VecDeque<DocumentClientWSData>>>,
rev_manager: Arc<RevisionManager>,
rev_manager: Arc<DocumentRevisionManager>,
source_ty: Arc<RwLock<SourceType>>,
}
impl SharedWSSinkDataProvider {
pub(crate) fn new(rev_manager: Arc<RevisionManager>) -> Self {
pub(crate) fn new(rev_manager: Arc<DocumentRevisionManager>) -> Self {
SharedWSSinkDataProvider {
shared: Arc::new(RwLock::new(VecDeque::new())),
rev_manager,
@ -241,7 +258,6 @@ impl SharedWSSinkDataProvider {
match self.rev_manager.next_sync_revision().await? {
Some(rev) => {
tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id);
let doc_id = rev.doc_id.clone();
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
},

View File

@ -9,6 +9,7 @@ edition = "2018"
lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
lib-infra = { path = "../../../shared-lib/lib-infra" }
protobuf = {version = "2.18.0"}
lib-ws = { path = "../../../shared-lib/lib-ws" }
@ -19,6 +20,6 @@ parking_lot = "0.11"
strum = "0.21"
strum_macros = "0.21"
tracing = { version = "0.1", features = ["log"] }
dashmap = {version = "4.0"}
[features]
http_server = []

View File

@ -1,5 +1,4 @@
use crate::{entities::NetworkState, services::ws::FlowyWSConnect};
use crate::{entities::NetworkState, services::ws_conn::FlowyWebSocketConnect};
use flowy_error::FlowyError;
use lib_dispatch::prelude::{Data, Unit};
use std::sync::Arc;
@ -7,7 +6,7 @@ use std::sync::Arc;
#[tracing::instrument(skip(data, ws_manager))]
pub async fn update_network_ty(
data: Data<NetworkState>,
ws_manager: Unit<Arc<FlowyWSConnect>>,
ws_manager: Unit<Arc<FlowyWebSocketConnect>>,
) -> Result<(), FlowyError> {
let network_state = data.into_inner();
ws_manager.update_network_type(&network_state.ty);

View File

@ -1,10 +1,10 @@
use crate::{event::NetworkEvent, handlers::*, services::ws::FlowyWSConnect};
use crate::{event::NetworkEvent, handlers::*, services::ws_conn::FlowyWebSocketConnect};
use lib_dispatch::prelude::*;
use std::sync::Arc;
pub fn create(ws_manager: Arc<FlowyWSConnect>) -> Module {
pub fn create(ws_conn: Arc<FlowyWebSocketConnect>) -> Module {
Module::new()
.name("Flowy-Network")
.data(ws_manager)
.data(ws_conn)
.event(NetworkEvent::UpdateNetworkType, update_network_ty)
}

View File

@ -0,0 +1,54 @@
use crate::services::ws_conn::{FlowyRawWebSocket, FlowyWSSender};
use flowy_error::internal_error;
pub use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use lib_ws::{WSController, WSSender};
use std::sync::Arc;
use tokio::sync::broadcast::Receiver;
impl FlowyRawWebSocket for Arc<WSController> {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
Ok(())
})
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
let controller = self.clone();
FutureResult::new(async move {
controller.stop().await;
Ok(())
})
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
Ok(())
})
}
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.add_ws_message_receiver(receiver).map_err(internal_error)?;
Ok(())
}
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
let sender = self.ws_message_sender().map_err(internal_error)?;
Ok(sender)
}
}
impl FlowyWSSender for WSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.send_msg(msg).map_err(internal_error)?;
Ok(())
}
}

View File

@ -0,0 +1,3 @@
pub use http_ws_impl::*;
mod http_ws_impl;

View File

@ -6,12 +6,16 @@ use flowy_collaboration::{
ws::{DocumentClientWSData, DocumentClientWSDataType},
},
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
protobuf::{
DocumentClientWSData as DocumentClientWSDataPB,
RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB,
},
sync::*,
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSModule, WebSocketRawMessage};
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use std::{
convert::TryInto,
fmt::{Debug, Formatter},
@ -19,62 +23,76 @@ use std::{
};
use tokio::sync::mpsc;
pub struct MockDocServer {
pub manager: Arc<ServerDocumentManager>,
}
impl std::default::Default for MockDocServer {
fn default() -> Self {
let persistence = Arc::new(MockDocServerPersistence::default());
let manager = Arc::new(ServerDocumentManager::new(persistence));
MockDocServer { manager }
pub(crate) fn spawn_server(receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>) -> Arc<LocalDocumentServer> {
let (server_tx, mut server_rx) = mpsc::unbounded_channel();
let server = Arc::new(LocalDocumentServer::new(server_tx));
tokio::spawn(async move {
while let Some(message) = server_rx.recv().await {
match receivers.get(&message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", message),
Some(handler) => handler.receive_message(message.clone()),
}
}
});
server
}
impl MockDocServer {
pub async fn handle_client_data(
&self,
client_data: DocumentClientWSData,
) -> Option<mpsc::Receiver<WebSocketRawMessage>> {
match client_data.ty {
DocumentClientWSDataType::ClientPushRev => {
let (tx, rx) = mpsc::channel(1);
let user = Arc::new(MockDocUser {
pub struct LocalDocumentServer {
pub doc_manager: Arc<ServerDocumentManager>,
sender: mpsc::UnboundedSender<WebSocketRawMessage>,
}
impl LocalDocumentServer {
pub fn new(sender: mpsc::UnboundedSender<WebSocketRawMessage>) -> Self {
let persistence = Arc::new(LocalDocServerPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence));
LocalDocumentServer { doc_manager, sender }
}
pub async fn handle_client_data(&self, client_data: DocumentClientWSData) -> Result<(), CollaborateError> {
tracing::debug!(
"[LocalDocumentServer] receive client data: {}:{:?} ",
client_data.doc_id,
client_data.ty
);
let user = Arc::new(LocalDocumentUser {
user_id: "fake_user_id".to_owned(),
tx,
ws_sender: self.sender.clone(),
});
let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData =
client_data.try_into().unwrap();
self.manager
.handle_client_revisions(user, pb_client_data)
.await
.unwrap();
Some(rx)
let ty = client_data.ty.clone();
let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap();
match ty {
DocumentClientWSDataType::ClientPushRev => {
let _ = self
.doc_manager
.handle_client_revisions(user, document_client_data)
.await?;
},
DocumentClientWSDataType::ClientPing => {
todo!()
let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
},
}
Ok(())
}
}
struct MockDocServerPersistence {
struct LocalDocServerPersistence {
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for MockDocServerPersistence {
impl Debug for LocalDocServerPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") }
}
impl std::default::Default for MockDocServerPersistence {
impl std::default::Default for LocalDocServerPersistence {
fn default() -> Self {
MockDocServerPersistence {
LocalDocServerPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl DocumentPersistence for MockDocServerPersistence {
impl DocumentPersistence for LocalDocServerPersistence {
fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
@ -118,16 +136,16 @@ impl DocumentPersistence for MockDocServerPersistence {
}
#[derive(Debug)]
struct MockDocUser {
struct LocalDocumentUser {
user_id: String,
tx: mpsc::Sender<WebSocketRawMessage>,
ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
}
impl RevisionUser for MockDocUser {
impl RevisionUser for LocalDocumentUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn receive(&self, resp: SyncResponse) {
let sender = self.tx.clone();
let sender = self.ws_sender.clone();
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
@ -136,7 +154,7 @@ impl RevisionUser for MockDocUser {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
sender.send(msg).unwrap();
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
@ -144,7 +162,7 @@ impl RevisionUser for MockDocUser {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
sender.send(msg).unwrap();
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
@ -152,7 +170,7 @@ impl RevisionUser for MockDocUser {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
sender.send(msg).unwrap();
},
SyncResponse::NewRevision(_) => {
// unimplemented!()

View File

@ -0,0 +1,105 @@
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::*;
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage};
use crate::services::{
local_ws::local_server::{spawn_server, LocalDocumentServer},
ws_conn::{FlowyRawWebSocket, FlowyWSSender},
};
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver};
pub struct LocalWebSocket {
receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: LocalWSSender,
server: Arc<LocalDocumentServer>,
}
impl std::default::Default for LocalWebSocket {
fn default() -> Self {
let (state_sender, _) = broadcast::channel(16);
let ws_sender = LocalWSSender::default();
let receivers = Arc::new(DashMap::new());
let server = spawn_server(receivers.clone());
LocalWebSocket {
receivers,
state_sender,
ws_sender,
server,
}
}
}
impl LocalWebSocket {
fn spawn_client(&self, _addr: String) {
let mut ws_receiver = self.ws_sender.subscribe();
let server = self.server.clone();
tokio::spawn(async move {
loop {
match ws_receiver.recv().await {
Ok(message) => {
let fut = || async {
let bytes = Bytes::from(message.data);
let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?;
let _ = server.handle_client_data(client_data).await?;
Ok::<(), FlowyError>(())
};
match fut().await {
Ok(_) => {},
Err(e) => tracing::error!("[LocalWebSocket] error: {:?}", e),
}
},
Err(e) => tracing::error!("[LocalWebSocket] error: {}", e),
}
}
});
}
}
impl FlowyRawWebSocket for LocalWebSocket {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
self.spawn_client(addr);
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
self.receivers.insert(receiver.source(), receiver);
Ok(())
}
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
#[derive(Clone)]
struct LocalWSSender(broadcast::Sender<WebSocketRawMessage>);
impl std::default::Default for LocalWSSender {
fn default() -> Self {
let (tx, _) = broadcast::channel(16);
Self(tx)
}
}
impl FlowyWSSender for LocalWSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}
impl std::ops::Deref for LocalWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
}

View File

@ -0,0 +1,4 @@
mod local_server;
mod local_ws_impl;
pub use local_ws_impl::*;

View File

@ -1,4 +1,3 @@
pub mod ws;
// #[cfg(feature = "flowy_unit_test")]
// mod mock;
pub mod http_ws;
pub mod local_ws;
pub mod ws_conn;

View File

@ -1,3 +0,0 @@
pub use conn::*;
mod conn;

View File

@ -1,37 +1,37 @@
use crate::entities::NetworkType;
use flowy_error::internal_error;
pub use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use lib_ws::{WSController, WSSender};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver};
use tokio::sync::broadcast;
pub trait FlowyWebSocket: Send + Sync {
pub trait FlowyRawWebSocket: Send + Sync {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>;
fn stop_connect(&self) -> FutureResult<(), FlowyError>;
fn subscribe_connect_state(&self) -> broadcast::Receiver<WSConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError>;
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError>;
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError>;
}
pub trait FlowyWSSender: Send + Sync {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError>;
}
pub struct FlowyWSConnect {
inner: Arc<dyn FlowyWebSocket>,
pub struct FlowyWebSocketConnect {
inner: Arc<dyn FlowyRawWebSocket>,
connect_type: RwLock<NetworkType>,
status_notifier: broadcast::Sender<NetworkType>,
addr: String,
}
impl FlowyWSConnect {
pub fn new(addr: String, ws: Arc<dyn FlowyWebSocket>) -> Self {
impl FlowyWebSocketConnect {
pub fn new(addr: String, ws: Arc<dyn FlowyRawWebSocket>) -> Self {
let (status_notifier, _) = broadcast::channel(10);
FlowyWSConnect {
FlowyWebSocketConnect {
inner: ws,
connect_type: RwLock::new(NetworkType::default()),
status_notifier,
@ -76,19 +76,19 @@ impl FlowyWSConnect {
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.inner.add_message_receiver(handler)?;
pub fn add_ws_message_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.inner.add_receiver(receiver)?;
Ok(())
}
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { self.inner.ws_sender() }
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { self.inner.sender() }
}
#[tracing::instrument(level = "debug", skip(manager))]
pub fn listen_on_websocket(manager: Arc<FlowyWSConnect>) {
#[tracing::instrument(level = "debug", skip(ws_conn))]
pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
if cfg!(feature = "http_server") {
let ws = manager.inner.clone();
let mut notify = manager.inner.subscribe_connect_state();
let ws = ws_conn.inner.clone();
let mut notify = ws_conn.inner.subscribe_connect_state();
let _ = tokio::spawn(async move {
loop {
match notify.recv().await {
@ -113,7 +113,7 @@ pub fn listen_on_websocket(manager: Arc<FlowyWSConnect>) {
};
}
async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
async fn retry_connect(ws: Arc<dyn FlowyRawWebSocket>, count: usize) {
match ws.reconnect(count).await {
Ok(_) => {},
Err(e) => {
@ -121,48 +121,3 @@ async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
},
}
}
impl FlowyWebSocket for Arc<WSController> {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
Ok(())
})
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
let controller = self.clone();
FutureResult::new(async move {
controller.stop().await;
Ok(())
})
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
Ok(())
})
}
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.add_receiver(handler).map_err(internal_error)?;
Ok(())
}
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
let sender = self.sender().map_err(internal_error)?;
Ok(sender)
}
}
impl FlowyWSSender for WSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.send_msg(msg).map_err(internal_error)?;
Ok(())
}
}

View File

@ -10,7 +10,6 @@ lib-dispatch = { path = "../lib-dispatch" }
lib-log = { path = "../lib-log" }
flowy-user = { path = "../flowy-user" }
flowy-net = { path = "../flowy-net" }
flowy-virtual-net = { path = "../flowy-virtual-net" }
flowy-core = { path = "../flowy-core", default-features = false }
flowy-database = { path = "../flowy-database" }
flowy-document = { path = "../flowy-document" }

View File

@ -6,7 +6,7 @@ use flowy_document::{
core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
errors::{internal_error, FlowyError},
};
use flowy_net::services::ws::FlowyWSConnect;
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use flowy_user::services::user::UserSession;
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc};
@ -14,7 +14,7 @@ use std::{convert::TryInto, path::Path, sync::Arc};
pub struct DocumentDepsResolver();
impl DocumentDepsResolver {
pub fn resolve(
ws_manager: Arc<FlowyWSConnect>,
ws_conn: Arc<FlowyWebSocketConnect>,
user_session: Arc<UserSession>,
) -> (
Arc<dyn DocumentUser>,
@ -24,11 +24,11 @@ impl DocumentDepsResolver {
let user = Arc::new(DocumentUserImpl { user: user_session });
let ws_sender = Arc::new(DocumentWebSocketAdapter {
ws_manager: ws_manager.clone(),
ws_conn: ws_conn.clone(),
});
let ws_receivers = Arc::new(DocumentWSReceivers::new());
let receiver = Arc::new(WSMessageReceiverAdaptor(ws_receivers.clone()));
ws_manager.add_receiver(receiver).unwrap();
ws_conn.add_ws_message_receiver(receiver).unwrap();
(user, ws_receivers, ws_sender)
}
}
@ -61,7 +61,7 @@ impl DocumentUser for DocumentUserImpl {
}
struct DocumentWebSocketAdapter {
ws_manager: Arc<FlowyWSConnect>,
ws_conn: Arc<FlowyWebSocketConnect>,
}
impl DocumentWebSocket for DocumentWebSocketAdapter {
@ -71,13 +71,12 @@ impl DocumentWebSocket for DocumentWebSocketAdapter {
module: WSModule::Doc,
data: bytes.to_vec(),
};
let sender = self.ws_manager.ws_sender().map_err(internal_error)?;
let sender = self.ws_conn.ws_sender().map_err(internal_error)?;
sender.send(msg).map_err(internal_error)?;
Ok(())
}
fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_manager.subscribe_websocket_state() }
fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_conn.subscribe_websocket_state() }
}
struct WSMessageReceiverAdaptor(Arc<DocumentWSReceivers>);

View File

@ -6,40 +6,63 @@ use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
use flowy_document::context::DocumentContext;
use flowy_net::{
entities::NetworkType,
services::ws::{listen_on_websocket, FlowyWSConnect, FlowyWebSocket},
services::{
local_ws::LocalWebSocket,
ws_conn::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
},
};
use flowy_user::{
prelude::UserStatus,
services::user::{UserSession, UserSessionConfig},
};
use flowy_virtual_net::local_web_socket;
use lib_dispatch::prelude::*;
use lib_ws::WSController;
use module::mk_modules;
pub use module::*;
use std::sync::{
use std::{
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::broadcast;
static INIT_LOG: AtomicBool = AtomicBool::new(false);
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct FlowySDKConfig {
name: String,
root: String,
log_filter: String,
server_config: ClientServerConfiguration,
ws: Arc<dyn FlowyRawWebSocket>,
}
impl fmt::Debug for FlowySDKConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FlowySDKConfig")
.field("name", &self.name)
.field("root", &self.root)
.field("server_config", &self.server_config)
.finish()
}
}
impl FlowySDKConfig {
pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self {
pub fn new(
root: &str,
server_config: ClientServerConfiguration,
name: &str,
ws: Option<Arc<dyn FlowyRawWebSocket>>,
) -> Self {
let ws = ws.unwrap_or_else(default_web_socket);
FlowySDKConfig {
name: name.to_owned(),
root: root.to_owned(),
log_filter: crate_log_filter(None),
server_config,
ws,
}
}
@ -73,7 +96,7 @@ pub struct FlowySDK {
pub document_ctx: Arc<DocumentContext>,
pub core: Arc<CoreContext>,
pub dispatcher: Arc<EventDispatcher>,
pub ws_manager: Arc<FlowyWSConnect>,
pub ws_conn: Arc<FlowyWebSocketConnect>,
}
impl FlowySDK {
@ -82,21 +105,18 @@ impl FlowySDK {
init_kv(&config.root);
tracing::debug!("🔥 {:?}", config);
let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
Arc::new(Arc::new(WSController::new()))
} else {
local_web_socket()
};
let ws_manager = Arc::new(FlowyWSConnect::new(config.server_config.ws_addr(), ws));
let ws_conn = Arc::new(FlowyWebSocketConnect::new(
config.server_config.ws_addr(),
config.ws.clone(),
));
let user_session = mk_user_session(&config);
let flowy_document = mk_document(ws_manager.clone(), user_session.clone(), &config.server_config);
let core_ctx = mk_core_context(user_session.clone(), flowy_document.clone(), &config.server_config);
let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config);
let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config);
//
let modules = mk_modules(ws_manager.clone(), core_ctx.clone(), user_session.clone());
let modules = mk_modules(&ws_conn, &core_ctx, &user_session);
let dispatcher = Arc::new(EventDispatcher::construct(|| modules));
_init(&dispatcher, ws_manager.clone(), user_session.clone(), core_ctx.clone());
_init(&dispatcher, &ws_conn, &user_session, &core_ctx);
Self {
config,
@ -104,7 +124,7 @@ impl FlowySDK {
document_ctx: flowy_document,
core: core_ctx,
dispatcher,
ws_manager,
ws_conn,
}
}
@ -113,18 +133,21 @@ impl FlowySDK {
fn _init(
dispatch: &EventDispatcher,
ws_manager: Arc<FlowyWSConnect>,
user_session: Arc<UserSession>,
core: Arc<CoreContext>,
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
core: &Arc<CoreContext>,
) {
let subscribe_user_status = user_session.notifier.subscribe_user_status();
let subscribe_network_type = ws_manager.subscribe_network_ty();
let subscribe_network_type = ws_conn.subscribe_network_ty();
let core = core.clone();
let cloned_core = core.clone();
let user_session = user_session.clone();
let ws_conn = ws_conn.clone();
dispatch.spawn(async move {
user_session.init();
listen_on_websocket(ws_manager.clone());
_listen_user_status(ws_manager.clone(), subscribe_user_status, core.clone()).await;
listen_on_websocket(ws_conn.clone());
_listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await;
});
dispatch.spawn(async move {
@ -133,7 +156,7 @@ fn _init(
}
async fn _listen_user_status(
ws_manager: Arc<FlowyWSConnect>,
ws_conn: Arc<FlowyWebSocketConnect>,
mut subscribe: broadcast::Receiver<UserStatus>,
core: Arc<CoreContext>,
) {
@ -142,19 +165,19 @@ async fn _listen_user_status(
match status {
UserStatus::Login { token } => {
let _ = core.user_did_sign_in(&token).await?;
let _ = ws_manager.start(token).await?;
let _ = ws_conn.start(token).await?;
},
UserStatus::Logout { .. } => {
core.user_did_logout().await;
let _ = ws_manager.stop().await;
let _ = ws_conn.stop().await;
},
UserStatus::Expired { .. } => {
core.user_session_expired().await;
let _ = ws_manager.stop().await;
let _ = ws_conn.stop().await;
},
UserStatus::SignUp { profile, ret } => {
let _ = core.user_did_sign_up(&profile.token).await?;
let _ = ws_manager.start(profile.token.clone()).await?;
let _ = ws_conn.start(profile.token.clone()).await?;
let _ = ret.send(());
},
}
@ -198,20 +221,28 @@ fn mk_user_session(config: &FlowySDKConfig) -> Arc<UserSession> {
}
fn mk_core_context(
user_session: Arc<UserSession>,
flowy_document: Arc<DocumentContext>,
user_session: &Arc<UserSession>,
flowy_document: &Arc<DocumentContext>,
server_config: &ClientServerConfiguration,
) -> Arc<CoreContext> {
let workspace_deps = WorkspaceDepsResolver::new(user_session);
let workspace_deps = WorkspaceDepsResolver::new(user_session.clone());
let (user, database) = workspace_deps.split_into();
init_core(user, database, flowy_document, server_config)
init_core(user, database, flowy_document.clone(), server_config)
}
fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> {
if cfg!(feature = "http_server") {
Arc::new(Arc::new(WSController::new()))
} else {
Arc::new(LocalWebSocket::default())
}
}
pub fn mk_document(
ws_manager: Arc<FlowyWSConnect>,
user_session: Arc<UserSession>,
ws_manager: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<DocumentContext> {
let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager, user_session);
let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager.clone(), user_session.clone());
Arc::new(DocumentContext::new(user, ws_receivers, ws_sender, server_config))
}

View File

@ -1,17 +1,17 @@
use flowy_core::context::CoreContext;
use flowy_net::services::ws::FlowyWSConnect;
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use flowy_user::services::user::UserSession;
use lib_dispatch::prelude::Module;
use std::sync::Arc;
pub fn mk_modules(
ws_manager: Arc<FlowyWSConnect>,
core: Arc<CoreContext>,
user_session: Arc<UserSession>,
ws_conn: &Arc<FlowyWebSocketConnect>,
core: &Arc<CoreContext>,
user_session: &Arc<UserSession>,
) -> Vec<Module> {
let user_module = mk_user_module(user_session);
let core_module = mk_core_module(core);
let network_module = mk_network_module(ws_manager);
let user_module = mk_user_module(user_session.clone());
let core_module = mk_core_module(core.clone());
let network_module = mk_network_module(ws_conn.clone());
vec![user_module, core_module, network_module]
}
@ -19,4 +19,4 @@ fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module
fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) }
fn mk_network_module(ws_manager: Arc<FlowyWSConnect>) -> Module { flowy_net::module::create(ws_manager) }
fn mk_network_module(ws_conn: Arc<FlowyWebSocketConnect>) -> Module { flowy_net::module::create(ws_conn) }

View File

@ -36,4 +36,3 @@ fake = "~2.3.0"
claim = "0.4.0"
futures = "0.3.15"
serial_test = "0.5.1"
flowy-virtual-net = { path = "../flowy-virtual-net", features = ["flowy_unit_test"] }

View File

@ -1,4 +1,5 @@
use crate::{helper::ViewTest, FlowySDKTest};
use backend_service::configuration::get_client_server_configuration;
use flowy_collaboration::entities::revision::RevisionState;
use flowy_document::core::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS};
use lib_ot::{core::Interval, rich_text::RichTextDelta};
@ -6,8 +7,6 @@ use std::sync::Arc;
use tokio::time::{sleep, Duration};
pub enum EditorScript {
StartWs,
StopWs,
InsertText(&'static str, usize),
Delete(Interval),
Replace(Interval, &'static str),
@ -16,8 +15,6 @@ pub enum EditorScript {
AssertNextRevId(Option<i64>),
AssertCurrentRevId(i64),
AssertJson(&'static str),
WaitSyncFinished,
}
pub struct EditorTest {
@ -27,10 +24,11 @@ pub struct EditorTest {
impl EditorTest {
pub async fn new() -> Self {
let sdk = FlowySDKTest::setup();
let server_config = get_client_server_configuration().unwrap();
let sdk = FlowySDKTest::new(server_config, None);
let _ = sdk.init_user().await;
let test = ViewTest::new(&sdk).await;
let editor = sdk.document_ctx.controller.open(&test.view.id).await.unwrap();
let editor = sdk.document_ctx.controller.open_document(&test.view.id).await.unwrap();
Self { sdk, editor }
}
@ -46,17 +44,11 @@ impl EditorTest {
let rev_manager = self.editor.rev_manager();
let cache = rev_manager.revision_cache();
let _user_id = self.sdk.user_session.user_id().unwrap();
let ws_manager = self.sdk.ws_manager.clone();
let token = self.sdk.user_session.token().unwrap();
// let ws_manager = self.sdk.ws_conn.clone();
// let token = self.sdk.user_session.token().unwrap();
let wait_millis = 2 * SYNC_INTERVAL_IN_MILLIS;
match script {
EditorScript::StartWs => {
ws_manager.start(token.clone()).await.unwrap();
},
EditorScript::StopWs => {
ws_manager.stop().await;
},
EditorScript::InsertText(s, offset) => {
self.editor.insert(offset, s).await.unwrap();
},
@ -91,10 +83,6 @@ impl EditorTest {
}
assert_eq!(expected_delta, delta);
},
EditorScript::WaitSyncFinished => {
// Workaround: just wait two seconds
sleep(Duration::from_millis(2000)).await;
},
}
sleep(Duration::from_millis(wait_millis)).await;
}

View File

@ -28,7 +28,7 @@ pub struct WorkspaceTest {
impl WorkspaceTest {
pub async fn new() -> Self {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let _ = sdk.init_user().await;
let workspace = create_workspace(&sdk, "Workspace", "").await;
open_workspace(&sdk, &workspace.id).await;
@ -45,7 +45,7 @@ pub struct AppTest {
impl AppTest {
pub async fn new() -> Self {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let _ = sdk.init_user().await;
let workspace = create_workspace(&sdk, "Workspace", "").await;
open_workspace(&sdk, &workspace.id).await;

View File

@ -4,9 +4,11 @@ pub mod helper;
use crate::helper::*;
use backend_service::configuration::{get_client_server_configuration, ClientServerConfiguration};
use flowy_net::services::ws_conn::FlowyRawWebSocket;
use flowy_sdk::{FlowySDK, FlowySDKConfig};
use flowy_user::entities::UserProfile;
use lib_infra::uuid_string;
use std::sync::Arc;
pub mod prelude {
pub use crate::{event_builder::*, helper::*, *};
@ -14,36 +16,42 @@ pub mod prelude {
}
#[derive(Clone)]
pub struct FlowySDKTest(pub FlowySDK);
pub struct FlowySDKTest {
pub inner: FlowySDK,
pub ws: Option<Arc<dyn FlowyRawWebSocket>>,
}
impl std::ops::Deref for FlowySDKTest {
type Target = FlowySDK;
fn deref(&self) -> &Self::Target { &self.0 }
fn deref(&self) -> &Self::Target { &self.inner }
}
impl FlowySDKTest {
pub fn setup() -> Self {
impl std::default::Default for FlowySDKTest {
fn default() -> Self {
let server_config = get_client_server_configuration().unwrap();
let sdk = Self::setup_with(server_config);
let sdk = Self::new(server_config, None);
std::mem::forget(sdk.dispatcher());
sdk
}
}
pub fn setup_with(server_config: ClientServerConfiguration) -> Self {
let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string()).log_filter("debug");
impl FlowySDKTest {
pub fn new(server_config: ClientServerConfiguration, ws: Option<Arc<dyn FlowyRawWebSocket>>) -> Self {
let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string(), None).log_filter("debug");
let sdk = FlowySDK::new(config);
Self(sdk)
std::mem::forget(sdk.dispatcher());
Self { inner: sdk, ws }
}
pub async fn sign_up(&self) -> SignUpContext {
let context = async_sign_up(self.0.dispatcher()).await;
let context = async_sign_up(self.inner.dispatcher()).await;
context
}
pub async fn init_user(&self) -> UserProfile {
let context = async_sign_up(self.0.dispatcher()).await;
init_user_setting(self.0.dispatcher()).await;
let context = async_sign_up(self.inner.dispatcher()).await;
init_user_setting(self.inner.dispatcher()).await;
context.user_profile
}
}

View File

@ -1 +1 @@
// mod revision_test;
mod revision_test;

View File

@ -17,11 +17,8 @@ async fn doc_sync_test() {
async fn doc_sync_retry_ws_conn() {
let scripts = vec![
InsertText("1", 0),
StopWs,
InsertText("2", 1),
InsertText("3", 2),
StartWs,
WaitSyncFinished,
AssertRevisionState(2, RevisionState::Ack),
AssertRevisionState(3, RevisionState::Ack),
AssertNextRevId(None),

View File

@ -5,7 +5,7 @@ use flowy_user::{errors::ErrorCode, event::UserEvent::*, prelude::*};
#[tokio::test]
async fn sign_up_with_invalid_email() {
for email in invalid_email_test_case() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let request = SignUpRequest {
email: email.to_string(),
name: valid_name(),
@ -27,7 +27,7 @@ async fn sign_up_with_invalid_email() {
#[tokio::test]
async fn sign_up_with_invalid_password() {
for password in invalid_password_test_case() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let request = SignUpRequest {
email: random_email(),
name: valid_name(),
@ -45,7 +45,7 @@ async fn sign_up_with_invalid_password() {
#[tokio::test]
async fn sign_in_success() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let _ = UserModuleEventBuilder::new(test.clone()).event(SignOut).sync_send();
let sign_up_context = test.sign_up().await;
@ -67,7 +67,7 @@ async fn sign_in_success() {
#[tokio::test]
async fn sign_in_with_invalid_email() {
for email in invalid_email_test_case() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let request = SignInRequest {
email: email.to_string(),
password: login_password(),
@ -90,7 +90,7 @@ async fn sign_in_with_invalid_email() {
#[tokio::test]
async fn sign_in_with_invalid_password() {
for password in invalid_password_test_case() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let request = SignInRequest {
email: random_email(),

View File

@ -6,7 +6,7 @@ use serial_test::*;
#[tokio::test]
async fn user_profile_get_failed() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let result = UserModuleEventBuilder::new(sdk)
.event(GetUserProfile)
.assert_error()
@ -18,7 +18,7 @@ async fn user_profile_get_failed() {
#[tokio::test]
#[serial]
async fn user_profile_get() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let user_profile = test.init_user().await;
let user = UserModuleEventBuilder::new(test.clone())
.event(GetUserProfile)
@ -30,7 +30,7 @@ async fn user_profile_get() {
#[tokio::test]
#[serial]
async fn user_update_with_name() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let user = sdk.init_user().await;
let new_name = "hello_world".to_owned();
let request = UpdateUserRequest::new(&user.id).name(&new_name);
@ -51,7 +51,7 @@ async fn user_update_with_name() {
#[tokio::test]
#[serial]
async fn user_update_with_email() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let user = sdk.init_user().await;
let new_email = format!("{}@gmail.com", uuid_string());
let request = UpdateUserRequest::new(&user.id).email(&new_email);
@ -71,7 +71,7 @@ async fn user_update_with_email() {
#[tokio::test]
#[serial]
async fn user_update_with_password() {
let sdk = FlowySDKTest::setup();
let sdk = FlowySDKTest::default();
let user = sdk.init_user().await;
let new_password = "H123world!".to_owned();
let request = UpdateUserRequest::new(&user.id).password(&new_password);
@ -86,7 +86,7 @@ async fn user_update_with_password() {
#[tokio::test]
#[serial]
async fn user_update_with_invalid_email() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let user = test.init_user().await;
for email in invalid_email_test_case() {
let request = UpdateUserRequest::new(&user.id).email(&email);
@ -105,7 +105,7 @@ async fn user_update_with_invalid_email() {
#[tokio::test]
#[serial]
async fn user_update_with_invalid_password() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let user = test.init_user().await;
for password in invalid_password_test_case() {
let request = UpdateUserRequest::new(&user.id).password(&password);
@ -121,7 +121,7 @@ async fn user_update_with_invalid_password() {
#[tokio::test]
#[serial]
async fn user_update_with_invalid_name() {
let test = FlowySDKTest::setup();
let test = FlowySDKTest::default();
let user = test.init_user().await;
let request = UpdateUserRequest::new(&user.id).name("");
UserModuleEventBuilder::new(test.clone())

View File

@ -1,23 +0,0 @@
[package]
name = "flowy-virtual-net"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lib-ws = { path = "../../../shared-lib/lib-ws" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-net = { path = "../flowy-net" }
bytes = { version = "1.0" }
parking_lot = "0.11"
tokio = {version = "1", features = ["sync"]}
tracing = { version = "0.1", features = ["log"] }
# flowy-collaboration and dashmap would be optional
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
dashmap = {version = "4.0"}
[features]
flowy_unit_test = []
http_server = []

View File

@ -1,12 +0,0 @@
use flowy_net::services::ws::FlowyWebSocket;
use std::sync::Arc;
mod ws;
#[cfg(not(feature = "flowy_unit_test"))]
pub fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(ws::LocalWebSocket::default()) }
#[cfg(feature = "flowy_unit_test")]
mod mock;
#[cfg(feature = "flowy_unit_test")]
pub fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(crate::mock::MockWebSocket::default()) }

View File

@ -1,4 +0,0 @@
mod server;
mod ws_local;
pub use ws_local::*;

View File

@ -1,95 +0,0 @@
use crate::mock::server::MockDocServer;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::*;
use flowy_net::services::ws::*;
use lib_infra::future::FutureResult;
use lib_ws::{WSModule, WebSocketRawMessage};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver};
pub struct MockWebSocket {
receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: MockWSSender,
is_stop: Arc<RwLock<bool>>,
server: Arc<MockDocServer>,
}
impl std::default::Default for MockWebSocket {
fn default() -> Self {
let (state_sender, _) = broadcast::channel(16);
let (ws_sender, _) = broadcast::channel(16);
let server = Arc::new(MockDocServer::default());
MockWebSocket {
receivers: Arc::new(DashMap::new()),
state_sender,
ws_sender: MockWSSender(ws_sender),
is_stop: Arc::new(RwLock::new(false)),
server,
}
}
}
impl FlowyWebSocket for MockWebSocket {
fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = false;
let mut ws_receiver = self.ws_sender.subscribe();
let receivers = self.receivers.clone();
let is_stop = self.is_stop.clone();
let server = self.server.clone();
tokio::spawn(async move {
while let Ok(message) = ws_receiver.recv().await {
if *is_stop.read() {
// do nothing
} else {
let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap();
if let Some(mut rx) = server.handle_client_data(ws_data).await {
let new_ws_message = rx.recv().await.unwrap();
match receivers.get(&new_ws_message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()),
}
}
}
}
});
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = true;
FutureResult::new(async { Ok(()) })
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
self.receivers.insert(handler.source(), handler);
Ok(())
}
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
#[derive(Clone)]
pub struct MockWSSender(broadcast::Sender<WebSocketRawMessage>);
impl FlowyWSSender for MockWSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}
impl std::ops::Deref for MockWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
}

View File

@ -1,3 +0,0 @@
mod ws_local;
pub use ws_local::*;

View File

@ -1,55 +0,0 @@
use flowy_net::services::ws::{
FlowyError,
FlowyWSSender,
FlowyWebSocket,
WSConnectState,
WSMessageReceiver,
WebSocketRawMessage,
};
use lib_infra::future::FutureResult;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver};
pub(crate) struct LocalWebSocket {
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: LocalWSSender,
}
impl std::default::Default for LocalWebSocket {
fn default() -> Self {
let (state_sender, _) = broadcast::channel(16);
let (ws_sender, _) = broadcast::channel(16);
LocalWebSocket {
state_sender,
ws_sender: LocalWSSender(ws_sender),
}
}
}
impl FlowyWebSocket for LocalWebSocket {
fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_message_receiver(&self, _handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> { Ok(()) }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
#[derive(Clone)]
pub struct LocalWSSender(broadcast::Sender<WebSocketRawMessage>);
impl FlowyWSSender for LocalWSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}
impl std::ops::Deref for LocalWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
}

View File

@ -123,11 +123,15 @@ impl ServerDocumentManager {
}
let mut write_guard = self.open_doc_map.write().await;
let doc = self.persistence.read_doc(doc_id).await.unwrap();
match self.persistence.read_doc(doc_id).await {
Ok(doc) => {
let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap();
write_guard.insert(doc_id.to_owned(), handler.clone());
drop(write_guard);
Some(handler)
},
Err(_) => None,
}
}
#[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]

View File

@ -59,7 +59,7 @@ impl std::default::Default for WSController {
impl WSController {
pub fn new() -> Self { WSController::default() }
pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), WSError> {
pub fn add_ws_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), WSError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
log::error!("WsSource's {:?} is already registered", source);
@ -133,7 +133,7 @@ impl WSController {
pub fn subscribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state_notify.subscribe() }
pub fn sender(&self) -> Result<Arc<WSSender>, WSError> {
pub fn ws_message_sender(&self) -> Result<Arc<WSSender>, WSError> {
match self.sender_ctrl.read().sender() {
None => Err(WSError::internal().context("WsSender is not initialized, should call connect first")),
Some(sender) => Ok(sender),