fix warnings

This commit is contained in:
appflowy 2021-12-15 16:28:18 +08:00
parent 84d5d2c2f1
commit ef6e777ec0
27 changed files with 132 additions and 71 deletions

1
backend/Cargo.lock generated
View File

@ -1423,6 +1423,7 @@ dependencies = [
"flowy-collaboration",
"flowy-core",
"flowy-document",
"flowy-net",
"flowy-sdk",
"flowy-user",
"futures-util",

View File

@ -103,5 +103,5 @@ flowy-sdk = { path = "../frontend/rust-lib/flowy-sdk", features = ["http_server"
flowy-user = { path = "../frontend/rust-lib/flowy-user", features = ["http_server"] }
flowy-document = { path = "../frontend/rust-lib/flowy-document", features = ["flowy_unit_test", "http_server"] }
flowy-test = { path = "../frontend/rust-lib/flowy-test" }
flowy-net = { path = "../frontend/rust-lib/flowy-net" }
flowy-net = { path = "../frontend/rust-lib/flowy-net", features = ["http_server"] }

View File

@ -59,7 +59,7 @@ impl RevisionUser for ServerDocUser {
match result {
Ok(_) => {},
Err(e) => log::error!("{}", e),
Err(e) => log::error!("[ServerDocUser]: {}", e),
}
}
}

View File

@ -197,7 +197,7 @@ async fn delta_sync_while_local_rev_greater_than_server_rev() {
DocScript::ClientInsertText(6, "efg"),
DocScript::ClientConnectWs,
DocScript::AssertClient(r#"[{"insert":"123abcefg\n"}]"#),
// DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#, 3),
DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#, 3),
])
.await;
}

View File

@ -123,7 +123,7 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
.unwrap();
},
DocScript::AssertClient(s) => {
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(2000)).await;
let json = context.read().client_edit_context().doc_json().await.unwrap();
assert_eq(s, &json);
},

View File

@ -48,11 +48,12 @@ pin-project = "1.0.0"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }
flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]}
flowy-net = { path = "../flowy-net", features = ["ws_mock"] }
color-eyre = { version = "0.5", default-features = false }
criterion = "0.3"
rand = "0.7.3"
env_logger = "0.8.2"
flowy-net = { path = "../flowy-net", features = ["ws_mock"] }
[features]
http_server = []

View File

@ -1,9 +1,8 @@
use crate::{
errors::FlowyError,
services::{
doc::{controller::DocController, edit::ClientDocEditor},
doc::{controller::DocController, edit::ClientDocEditor, WsDocumentManager},
server::construct_doc_server,
ws::WsDocumentManager,
},
};
use backend_service::configuration::ClientServerConfiguration;

View File

@ -5,9 +5,9 @@ use crate::{
doc::{
edit::{ClientDocEditor, EditDocWsHandler},
revision::{RevisionCache, RevisionManager, RevisionServer},
WsDocumentManager,
},
server::Server,
ws::WsDocumentManager,
},
};
use bytes::Bytes;

View File

@ -1,12 +1,11 @@
use crate::{
errors::FlowyError,
module::DocumentUser,
services::{
doc::{
edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas},
revision::{RevisionDownStream, RevisionManager, SteamStopTx},
},
ws::{DocumentWebSocket, WsDocumentHandler},
services::doc::{
edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas},
revision::{RevisionDownStream, RevisionManager, SteamStopTx},
DocumentWebSocket,
WsDocumentHandler,
},
};
use bytes::Bytes;
@ -64,7 +63,7 @@ impl ClientDocEditor {
stop_sync_tx,
});
// edit_doc.notify_open_doc();
edit_doc.connect_to_doc();
start_sync(edit_doc.clone(), ws_msg_rx, cloned_stop_sync_tx);
Ok(edit_doc)
@ -192,7 +191,7 @@ impl ClientDocEditor {
}
#[tracing::instrument(level = "debug", skip(self))]
fn notify_open_doc(&self) {
fn connect_to_doc(&self) {
let rev_id: RevId = self.rev_manager.rev_id().into();
if let Ok(user_id) = self.user.user_id() {
let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws_sender);
@ -294,7 +293,7 @@ impl WsDocumentHandler for EditDocWsHandler {
match state {
WsConnectState::Init => {},
WsConnectState::Connecting => {},
WsConnectState::Connected => self.notify_open_doc(),
WsConnectState::Connected => self.connect_to_doc(),
WsConnectState::Disconnected => {},
}
}

View File

@ -1,11 +1,11 @@
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt::skip)]
use crate::{errors::FlowyError, services::ws::DocumentWebSocket};
use crate::{errors::FlowyError};
use futures::future::BoxFuture;
use lib_infra::retry::Action;
use lib_ot::revision::RevId;
use std::{future, sync::Arc};
use crate::services::doc::DocumentWebSocket;
#[allow(dead_code)]
pub(crate) struct OpenDocAction {

View File

@ -2,3 +2,8 @@ pub mod edit;
pub mod revision;
pub(crate) mod controller;
mod ws_manager;
pub use ws_manager::*;
pub const SYNC_INTERVAL_IN_MILLIS: u64 = 500;

View File

@ -177,15 +177,12 @@ impl RevisionIterator for RevisionCache {
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
match memory_cache.front_local_revision().await {
None => {
//
match memory_cache.front_local_rev_id().await {
None => match memory_cache.front_local_rev_id().await {
None => Ok(None),
Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
None => Ok(None),
Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
None => Ok(None),
Some(record) => Ok(Some(record)),
},
}
Some(record) => Ok(Some(record)),
},
},
Some((_, record)) => Ok(Some(record)),
}

View File

@ -56,7 +56,7 @@ impl RevisionMemoryCache {
}
match self.local_revs.write().await.pop_front() {
None => tracing::error!("❌The local_revs should not be empty"),
None => {},
Some(pop_rev_id) => {
if &pop_rev_id != rev_id {
tracing::error!("The front rev_id:{} not equal to ack rev_id: {}", pop_rev_id, rev_id);

View File

@ -1,8 +1,8 @@
use crate::{
errors::FlowyError,
services::{
doc::revision::{RevisionCache, RevisionUpStream, SteamStopRx},
ws::DocumentWebSocket,
services::doc::{
revision::{RevisionCache, RevisionUpStream, SteamStopRx},
DocumentWebSocket,
},
};
use flowy_collaboration::{entities::doc::Doc, util::RevIdCounter};

View File

@ -1,9 +1,8 @@
use crate::services::{
doc::{
edit::ClientDocEditor,
revision::{RevisionIterator, RevisionManager},
},
ws::DocumentWebSocket,
use crate::services::doc::{
edit::ClientDocEditor,
revision::{RevisionIterator, RevisionManager},
DocumentWebSocket,
SYNC_INTERVAL_IN_MILLIS,
};
use async_stream::stream;
use bytes::Bytes;
@ -161,7 +160,7 @@ impl RevisionUpStream {
.for_each(|msg| async {
match self.handle_msg(msg).await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
Err(e) => log::error!("[RevisionUpStream]: send msg failed, {:?}", e),
}
})
.await;
@ -175,23 +174,26 @@ impl RevisionUpStream {
async fn send_next_revision(&self) -> FlowyResult<()> {
match self.revisions.next().await? {
None => Ok(()),
None => {
tracing::debug!("Finish synchronizing revisions");
Ok(())
},
Some(record) => {
tracing::debug!(
"[RevisionUpStream]: processes revision: {}:{:?}",
record.revision.doc_id,
record.revision.rev_id
);
let _ = self.ws_sender.send(record.revision.into()).map_err(internal_error);
// let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
Ok(())
self.ws_sender.send(record.revision.into()).map_err(internal_error)
// let _ = tokio::time::timeout(Duration::from_millis(2000),
// ret.recv()).await;
},
}
}
}
async fn tick(sender: mpsc::UnboundedSender<UpStreamMsg>) {
let mut i = interval(Duration::from_secs(2));
let mut i = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
while sender.send(UpStreamMsg::Tick).is_ok() {
i.tick().await;
}

View File

@ -1,3 +1,2 @@
pub mod doc;
pub mod server;
pub mod ws;

View File

@ -1,2 +0,0 @@
mod ws_manager;
pub use ws_manager::*;

View File

@ -9,7 +9,7 @@ async fn doc_rev_state_test1() {
AssertRevisionState(1, RevState::StateLocal),
SimulateAckedMessage(1),
AssertRevisionState(1, RevState::Acked),
AssertNextSendingRevision(None),
AssertNextRevId(None),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
EditorTest::new().await.run_scripts(scripts).await;
@ -27,11 +27,11 @@ async fn doc_rev_state_test2() {
AssertRevisionState(3, RevState::StateLocal),
SimulateAckedMessage(1),
AssertRevisionState(1, RevState::Acked),
AssertNextSendingRevision(Some(2)),
AssertNextRevId(Some(2)),
SimulateAckedMessage(2),
AssertRevisionState(2, RevState::Acked),
//
AssertNextSendingRevision(Some(3)),
AssertNextRevId(Some(3)),
AssertRevisionState(3, RevState::StateLocal),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
@ -57,6 +57,35 @@ async fn doc_sync_test() {
InsertText("2", 1),
InsertText("3", 2),
AssertJson(r#"[{"insert":"123\n"}]"#),
AssertNextRevId(None),
];
EditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn doc_sync_lost_ws_conn() {
let scripts = vec![
InsertText("1", 0),
StopWs,
InsertText("2", 1),
AssertNextRevId(Some(2)),
InsertText("3", 2),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
EditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn doc_sync_retry_ws_conn() {
let scripts = vec![
InsertText("1", 0),
StopWs,
InsertText("2", 1),
InsertText("3", 2),
StartWs,
WaitSyncFinished,
AssertNextRevId(None),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
EditorTest::new().await.run_scripts(scripts).await;
}

View File

@ -25,4 +25,5 @@ lazy_static = {version = "1.4.0", optional = true}
dashmap = {version = "4.0", optional = true}
[features]
ws_mock = ["flowy-collaboration", "lazy_static", "dashmap"]
ws_mock = ["flowy-collaboration", "lazy_static", "dashmap"]
http_server = []

View File

@ -14,6 +14,7 @@ use flowy_collaboration::{
use lazy_static::lazy_static;
use lib_infra::future::{FutureResult, FutureResultSend};
use lib_ws::WsModule;
use parking_lot::RwLock;
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
@ -24,6 +25,7 @@ pub struct MockWebSocket {
handlers: DashMap<WsModule, Arc<dyn WsMessageHandler>>,
state_sender: broadcast::Sender<WsConnectState>,
ws_sender: broadcast::Sender<WsMessage>,
is_stop: RwLock<bool>,
}
impl std::default::Default for MockWebSocket {
@ -34,6 +36,7 @@ impl std::default::Default for MockWebSocket {
handlers: DashMap::new(),
state_sender,
ws_sender,
is_stop: RwLock::new(false),
}
}
}
@ -44,16 +47,22 @@ impl MockWebSocket {
impl FlowyWebSocket for Arc<MockWebSocket> {
fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = false;
let mut ws_receiver = self.ws_sender.subscribe();
let cloned_ws = self.clone();
tokio::spawn(async move {
while let Ok(message) = ws_receiver.recv().await {
let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap();
let mut rx = DOC_SERVER.handle_ws_data(ws_data).await;
let new_ws_message = rx.recv().await.unwrap();
match cloned_ws.handlers.get(&new_ws_message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()),
if *cloned_ws.is_stop.read() {
// do nothing
} else {
let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap();
let mut rx = DOC_SERVER.handle_ws_data(ws_data).await;
let new_ws_message = rx.recv().await.unwrap();
match cloned_ws.handlers.get(&new_ws_message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()),
}
}
}
});
@ -61,13 +70,16 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = true;
FutureResult::new(async { Ok(()) })
}
fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
fn add_ws_message_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
tracing::error!("WsSource's {:?} is already registered", source);

View File

@ -10,7 +10,7 @@ pub trait FlowyWebSocket: Send + Sync {
fn stop_connect(&self) -> FutureResult<(), FlowyError>;
fn subscribe_connect_state(&self) -> broadcast::Receiver<WsConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError>;
fn add_ws_message_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError>;
}

View File

@ -35,13 +35,13 @@ impl WsManager {
pub async fn start(&self, token: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, token);
self.inner.stop_connect().await;
self.inner.stop_connect().await?;
let _ = self.inner.start_connect(addr).await?;
Ok(())
}
pub async fn stop(&self) { self.inner.stop_connect().await; }
pub async fn stop(&self) { let _ = self.inner.stop_connect().await; }
pub fn update_network_type(&self, new_type: &NetworkType) {
tracing::debug!("Network new state: {:?}", new_type);
@ -72,7 +72,7 @@ impl WsManager {
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let _ = self.inner.add_handler(handler)?;
let _ = self.inner.add_ws_message_handler(handler)?;
Ok(())
}
@ -139,8 +139,8 @@ impl FlowyWebSocket for Arc<WsController> {
})
}
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let _ = self.add_handler(handler)?;
fn add_ws_message_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let _ = self.add_handler(handler).map_err(internal_error)?;
Ok(())
}

View File

@ -28,7 +28,7 @@ impl FlowyWebSocket for Arc<LocalWebSocket> {
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_handler(&self, _handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> { Ok(()) }
fn add_ws_message_handler(&self, _handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> { Ok(()) }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}

View File

@ -4,7 +4,7 @@ use flowy_database::ConnectionPool;
use flowy_document::{
errors::{internal_error, FlowyError},
module::DocumentUser,
services::ws::{DocumentWebSocket, WsDocumentManager, WsStateReceiver},
services::doc::{DocumentWebSocket, WsDocumentManager, WsStateReceiver},
};
use flowy_net::services::ws::WsManager;
use flowy_user::services::user::UserSession;

View File

@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
flowy-sdk = { path = "../flowy-sdk"}
flowy-user = { path = "../flowy-user"}
flowy-net = { path = "../flowy-net"}
flowy-core = { path = "../flowy-core", default-features = false}
flowy-document = { path = "../flowy-document"}
lib-dispatch = { path = "../lib-dispatch" }

View File

@ -3,7 +3,8 @@ use flowy_collaboration::entities::{
doc::DocIdentifier,
ws::{WsDocumentData, WsDocumentDataBuilder},
};
use flowy_document::services::doc::{edit::ClientDocEditor, revision::RevisionIterator};
use flowy_document::services::doc::{edit::ClientDocEditor, revision::RevisionIterator, SYNC_INTERVAL_IN_MILLIS};
use lib_ot::{
core::Interval,
revision::{RevState, RevType, Revision, RevisionRange},
@ -13,16 +14,19 @@ use std::sync::Arc;
use tokio::time::{sleep, Duration};
pub enum EditorScript {
StartWs,
StopWs,
InsertText(&'static str, usize),
Delete(Interval),
Replace(Interval, &'static str),
Undo(),
Redo(),
WaitSyncFinished,
SimulatePushRevisionMessageWithDelta(RichTextDelta),
SimulatePullRevisionMessage(RevisionRange),
SimulateAckedMessage(i64),
AssertRevisionState(i64, RevState),
AssertNextSendingRevision(Option<i64>),
AssertNextRevId(Option<i64>),
AssertCurrentRevId(i64),
AssertJson(&'static str),
}
@ -47,7 +51,7 @@ impl EditorTest {
self.run_script(script).await;
}
sleep(Duration::from_secs(10)).await;
sleep(Duration::from_secs(5)).await;
}
async fn run_script(&mut self, script: EditorScript) {
@ -57,11 +61,20 @@ impl EditorTest {
let _disk_cache = cache.dish_cache();
let doc_id = self.editor.doc_id.clone();
let user_id = self.sdk.user_session.user_id().unwrap();
let ws_manager = self.sdk.ws_manager.clone();
let token = self.sdk.user_session.token().unwrap();
match script {
EditorScript::StartWs => {
ws_manager.start(token.clone()).await.unwrap();
},
EditorScript::StopWs => {
sleep(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)).await;
ws_manager.stop().await;
},
EditorScript::InsertText(s, offset) => {
self.editor.insert(offset, s).await.unwrap();
sleep(Duration::from_millis(200)).await;
sleep(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)).await;
},
EditorScript::Delete(interval) => {
self.editor.delete(interval).await.unwrap();
@ -75,6 +88,9 @@ impl EditorTest {
EditorScript::Redo() => {
self.editor.redo().await.unwrap();
},
EditorScript::WaitSyncFinished => {
sleep(Duration::from_millis(1000)).await;
},
EditorScript::AssertRevisionState(rev_id, state) => {
let record = cache.query_revision(&doc_id, rev_id).await.unwrap();
assert_eq!(record.state, state);
@ -82,10 +98,11 @@ impl EditorTest {
EditorScript::AssertCurrentRevId(rev_id) => {
assert_eq!(self.editor.rev_manager().rev_id(), rev_id);
},
EditorScript::AssertNextSendingRevision(rev_id) => {
EditorScript::AssertNextRevId(rev_id) => {
let next_revision = cache.next().await.unwrap();
if rev_id.is_none() {
assert_eq!(next_revision.is_none(), true);
return;
}
let next_revision = next_revision.unwrap();