rename files

This commit is contained in:
appflowy 2021-10-05 11:46:56 +08:00
parent d0111e30dc
commit ad5bc23296
30 changed files with 176 additions and 137 deletions

View File

@ -8,7 +8,7 @@ use flowy_document::protobuf::{Doc, Revision};
use flowy_net::errors::{internal_error, Result as DocResult, ServerError};
use futures::stream::StreamExt;
use sqlx::PgPool;
use std::sync::Arc;
use std::sync::{atomic::Ordering::SeqCst, Arc};
use tokio::{
sync::{mpsc, oneshot},
task::spawn_blocking,
@ -35,6 +35,9 @@ pub enum EditMsg {
DocumentJson {
ret: oneshot::Sender<DocResult<String>>,
},
DocumentRevId {
ret: oneshot::Sender<DocResult<i64>>,
},
NewDocUser {
user: Arc<WsUser>,
socket: Socket,
@ -97,6 +100,10 @@ impl EditDocActor {
.map_err(internal_error);
let _ = ret.send(json);
},
EditMsg::DocumentRevId { ret } => {
let edit_context = self.edit_doc.clone();
let _ = ret.send(Ok(edit_context.rev_id.load(SeqCst)));
},
EditMsg::NewDocUser {
user,
socket,

View File

@ -32,8 +32,8 @@ use std::{
};
pub struct ServerEditDoc {
doc_id: String,
rev_id: AtomicI64,
pub doc_id: String,
pub rev_id: AtomicI64,
document: Arc<RwLock<Document>>,
users: DashMap<String, EditUser>,
}
@ -121,7 +121,7 @@ impl ServerEditDoc {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let cli_revision = self.transform_client_revision(&revision)?;
let cli_revision = self.transform_revision(&revision)?;
let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
},
@ -137,8 +137,16 @@ impl ServerEditDoc {
Ok(())
}
fn transform_client_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
#[tracing::instrument(level = "debug", skip(self, revision))]
fn transform_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
let cli_delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
let (cli_prime, server_prime) = self
.document
.read()
.delta()
.transform(&cli_delta)
.map_err(internal_error)?;
let _ = self.compose_delta(server_prime)?;
let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
Ok(cli_revision)
@ -159,19 +167,14 @@ impl ServerEditDoc {
revision
}
#[tracing::instrument(level = "debug", skip(self, delta_data))]
fn transform(&self, delta_data: &Vec<u8>) -> Result<(Delta, Delta), OTError> {
log::debug!("Document: {}", self.document.read().to_json());
let doc_delta = self.document.read().delta().clone();
let cli_delta = Delta::from_bytes(delta_data)?;
log::debug!("Compose delta: {}", cli_delta);
let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?;
Ok((cli_prime, server_prime))
}
#[tracing::instrument(level = "debug", skip(self), err)]
#[tracing::instrument(
level = "debug",
skip(self, delta),
fields(
delta = %delta.to_json(),
result,
)
)]
fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
// Opti: push each revision into queue and process it one by one.
match self.document.try_write_for(Duration::from_millis(300)) {
@ -180,7 +183,7 @@ impl ServerEditDoc {
},
Some(mut write_guard) => {
let _ = write_guard.compose_delta(&delta).map_err(internal_error)?;
log::debug!("Document: {}", write_guard.to_json());
tracing::Span::current().record("result", &write_guard.to_json().as_str());
},
}
Ok(())

View File

@ -18,7 +18,6 @@ impl DocHandle {
let (sender, receiver) = mpsc::channel(100);
let actor = EditDocActor::new(receiver, doc, pg_pool)?;
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
@ -58,6 +57,12 @@ impl DocHandle {
self.send(msg, rx).await?
}
pub async fn rev_id(&self) -> DocResult<i64> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::DocumentRevId { ret };
self.send(msg, rx).await?
}
pub(crate) async fn send<T>(&self, msg: EditMsg, rx: oneshot::Receiver<T>) -> DocResult<T> {
let _ = self.sender.send(msg).await.map_err(internal_error)?;
let result = rx.await?;

View File

@ -1,5 +1,6 @@
use crate::document::helper::{DocScript, DocumentTest};
use flowy_document::services::doc::{Document, FlowyDoc};
use flowy_ot::core::{Attribute, Interval};
#[rustfmt::skip]
// ┌─────────┐ ┌─────────┐
@ -16,15 +17,33 @@ use flowy_document::services::doc::{Document, FlowyDoc};
// └──────────────────────────┘ │ │ └──────────────────────┘
// │ │
#[actix_rt::test]
async fn delta_sync_after_ws_connection() {
async fn delta_sync_while_editing() {
let test = DocumentTest::new().await;
test.run_scripts(vec![
DocScript::ConnectWs,
DocScript::OpenDoc,
DocScript::SendText(0, "abc"),
DocScript::SendText(3, "123"),
DocScript::InsertText(0, "abc"),
DocScript::InsertText(3, "123"),
DocScript::AssertClient(r#"[{"insert":"abc123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#, 2),
])
.await;
}
#[actix_rt::test]
async fn delta_sync_while_editing_with_attribute() {
let test = DocumentTest::new().await;
test.run_scripts(vec![
DocScript::ConnectWs,
DocScript::OpenDoc,
DocScript::InsertText(0, "abc"),
DocScript::FormatText(Interval::new(0, 3), Attribute::Bold(true)),
DocScript::AssertClient(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#, 2),
DocScript::InsertText(3, "efg"),
DocScript::FormatText(Interval::new(3, 5), Attribute::Italic(true)),
DocScript::AssertClient(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#, 4),
])
.await;
}
@ -54,7 +73,7 @@ async fn delta_sync_with_http_request() {
DocScript::SetServerDocument(json, 3),
DocScript::OpenDoc,
DocScript::AssertClient(r#"[{"insert":"123456\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123456\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123456\n"}]"#, 3),
])
.await;
}
@ -116,10 +135,10 @@ async fn delta_sync_while_local_rev_less_than_server_rev() {
test.run_scripts(vec![
DocScript::OpenDoc,
DocScript::SetServerDocument(json, 3),
DocScript::SendText(0, "abc"),
DocScript::InsertText(0, "abc"),
DocScript::ConnectWs,
DocScript::AssertClient(r#"[{"insert":"abc\n123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc\n123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc\n123\n"}]"#, 4),
])
.await;
}
@ -160,11 +179,11 @@ async fn delta_sync_while_local_rev_greater_than_server_rev() {
DocScript::SetServerDocument(json, 1),
DocScript::OpenDoc,
DocScript::AssertClient(r#"[{"insert":"123\n"}]"#),
DocScript::SendText(3, "abc"),
DocScript::SendText(6, "efg"),
DocScript::InsertText(3, "abc"),
DocScript::InsertText(6, "efg"),
DocScript::ConnectWs,
DocScript::AssertClient(r#"[{"insert":"123abcefg\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#, 3),
])
.await;
}

View File

@ -15,6 +15,7 @@ use flowy_user::services::user::UserSession;
use crate::helper::{spawn_server, TestServer};
use flowy_document::protobuf::UpdateDocParams;
use flowy_ot::core::{Attribute, Interval};
use parking_lot::RwLock;
use serde::__private::Formatter;
@ -25,27 +26,14 @@ pub struct DocumentTest {
#[derive(Clone)]
pub enum DocScript {
ConnectWs,
SendText(usize, &'static str),
InsertText(usize, &'static str),
FormatText(Interval, Attribute),
AssertClient(&'static str),
AssertServer(&'static str),
AssertServer(&'static str, i64),
SetServerDocument(String, i64), // delta_json, rev_id
OpenDoc,
}
impl std::fmt::Display for DocScript {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let name = match self {
DocScript::ConnectWs => "ConnectWs",
DocScript::SendText(_, _) => "SendText",
DocScript::AssertClient(_) => "AssertClient",
DocScript::AssertServer(_) => "AssertServer",
DocScript::SetServerDocument(_, _) => "SetServerDocument",
DocScript::OpenDoc => "OpenDoc",
};
f.write_str(&format!("******** {} *********", name))
}
}
impl DocumentTest {
pub async fn new() -> Self {
let server = spawn_server().await;
@ -122,19 +110,28 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
DocScript::OpenDoc => {
context.write().open_doc().await;
},
DocScript::SendText(index, s) => {
DocScript::InsertText(index, s) => {
context.read().client_edit_context().insert(index, s).await.unwrap();
},
DocScript::FormatText(interval, attribute) => {
context
.read()
.client_edit_context()
.format(interval, attribute)
.await
.unwrap();
},
DocScript::AssertClient(s) => {
sleep(Duration::from_millis(100)).await;
let json = context.read().client_edit_context().doc_json().await.unwrap();
assert_eq(s, &json);
},
DocScript::AssertServer(s) => {
DocScript::AssertServer(s, rev_id) => {
sleep(Duration::from_millis(100)).await;
let pg_pool = context.read().pool.clone();
let doc_manager = context.read().doc_manager.clone();
let edit_doc = doc_manager.get(&doc_id, pg_pool).await.unwrap().unwrap();
assert_eq!(edit_doc.rev_id().await.unwrap(), rev_id);
let json = edit_doc.document_json().await.unwrap();
assert_eq(s, &json);
},

View File

@ -61,7 +61,7 @@ pub extern "C" fn sync_command(input: *const u8, len: usize) -> *const u8 {
#[no_mangle]
pub extern "C" fn set_stream_port(port: i64) -> i32 {
flowy_observable::dart::RustStreamSender::set_port(port);
flowy_observable::dart::DartStreamSender::set_port(port);
return 0;
}

View File

@ -1,3 +1,3 @@
proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/observable"]
proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"]
event_files = []

View File

@ -1,7 +1,7 @@
pub mod entities;
pub mod errors;
pub mod module;
mod observable;
mod notify;
pub mod protobuf;
pub mod services;
mod sql_tables;

View File

@ -1,5 +1,5 @@
use flowy_derive::ProtoBuf_Enum;
use flowy_observable::NotifyBuilder;
use flowy_observable::DartNotifyBuilder;
const OBSERVABLE_CATEGORY: &'static str = "Doc";
#[derive(ProtoBuf_Enum, Debug)]
pub(crate) enum DocObservable {
@ -11,6 +11,6 @@ impl std::convert::Into<i32> for DocObservable {
}
#[allow(dead_code)]
pub(crate) fn observable(id: &str, ty: DocObservable) -> NotifyBuilder {
NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
pub(crate) fn dart_notify(id: &str, ty: DocObservable) -> DartNotifyBuilder {
DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
}

View File

@ -3,7 +3,7 @@ use crate::{
errors::{internal_error, DocResult},
services::doc::{
edit::{
message::{EditMsg, TransformDeltas},
message::{DocumentMsg, TransformDeltas},
DocId,
},
Document,
@ -17,19 +17,19 @@ use futures::stream::StreamExt;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{mpsc, RwLock};
pub struct DocumentEditActor {
pub struct DocumentActor {
doc_id: DocId,
document: Arc<RwLock<Document>>,
pool: Arc<ConnectionPool>,
receiver: Option<mpsc::UnboundedReceiver<EditMsg>>,
receiver: Option<mpsc::UnboundedReceiver<DocumentMsg>>,
}
impl DocumentEditActor {
impl DocumentActor {
pub fn new(
doc_id: &str,
delta: Delta,
pool: Arc<ConnectionPool>,
receiver: mpsc::UnboundedReceiver<EditMsg>,
receiver: mpsc::UnboundedReceiver<DocumentMsg>,
) -> Self {
let doc_id = doc_id.to_string();
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
@ -61,13 +61,13 @@ impl DocumentEditActor {
.await;
}
async fn handle_message(&self, msg: EditMsg) -> DocResult<()> {
async fn handle_message(&self, msg: DocumentMsg) -> DocResult<()> {
match msg {
EditMsg::Delta { delta, ret } => {
DocumentMsg::Delta { delta, ret } => {
let result = self.compose_delta(delta).await;
let _ = ret.send(result);
},
EditMsg::RemoteRevision { bytes, ret } => {
DocumentMsg::RemoteRevision { bytes, ret } => {
let revision = Revision::try_from(bytes)?;
let delta = Delta::from_bytes(&revision.delta_data)?;
let rev_id: RevId = revision.rev_id.into();
@ -79,15 +79,15 @@ impl DocumentEditActor {
};
let _ = ret.send(Ok(transform_delta));
},
EditMsg::Insert { index, data, ret } => {
DocumentMsg::Insert { index, data, ret } => {
let delta = self.document.write().await.insert(index, data);
let _ = ret.send(delta);
},
EditMsg::Delete { interval, ret } => {
DocumentMsg::Delete { interval, ret } => {
let result = self.document.write().await.delete(interval);
let _ = ret.send(result);
},
EditMsg::Format {
DocumentMsg::Format {
interval,
attribute,
ret,
@ -95,29 +95,29 @@ impl DocumentEditActor {
let result = self.document.write().await.format(interval, attribute);
let _ = ret.send(result);
},
EditMsg::Replace { interval, data, ret } => {
DocumentMsg::Replace { interval, data, ret } => {
let result = self.document.write().await.replace(interval, data);
let _ = ret.send(result);
},
EditMsg::CanUndo { ret } => {
DocumentMsg::CanUndo { ret } => {
let _ = ret.send(self.document.read().await.can_undo());
},
EditMsg::CanRedo { ret } => {
DocumentMsg::CanRedo { ret } => {
let _ = ret.send(self.document.read().await.can_redo());
},
EditMsg::Undo { ret } => {
DocumentMsg::Undo { ret } => {
let result = self.document.write().await.undo();
let _ = ret.send(result);
},
EditMsg::Redo { ret } => {
DocumentMsg::Redo { ret } => {
let result = self.document.write().await.redo();
let _ = ret.send(result);
},
EditMsg::Doc { ret } => {
DocumentMsg::Doc { ret } => {
let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data));
},
EditMsg::SaveDocument { rev_id, ret } => {
DocumentMsg::SaveDocument { rev_id, ret } => {
let result = self.save_to_disk(rev_id).await;
let _ = ret.send(result);
},

View File

@ -8,8 +8,8 @@ use crate::{
services::{
doc::{
edit::{
edit_actor::DocumentEditActor,
message::{EditMsg, TransformDeltas},
doc_actor::DocumentActor,
message::{DocumentMsg, TransformDeltas},
model::NotifyOpenDocAction,
},
revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor},
@ -31,7 +31,7 @@ pub type DocId = String;
pub struct ClientEditDoc {
pub doc_id: DocId,
rev_manager: Arc<RevisionManager>,
document: UnboundedSender<EditMsg>,
document: UnboundedSender<DocumentMsg>,
ws: Arc<dyn DocumentWebSocket>,
pool: Arc<ConnectionPool>,
user: Arc<dyn DocumentUser>,
@ -64,7 +64,7 @@ impl ClientEditDoc {
pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Insert {
let msg = DocumentMsg::Insert {
index,
data: data.to_string(),
ret,
@ -77,7 +77,7 @@ impl ClientEditDoc {
pub async fn delete(&self, interval: Interval) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Delete { interval, ret };
let msg = DocumentMsg::Delete { interval, ret };
let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes();
let _ = self.mk_revision(&delta_data).await?;
@ -86,7 +86,7 @@ impl ClientEditDoc {
pub async fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Format {
let msg = DocumentMsg::Format {
interval,
attribute,
ret,
@ -99,7 +99,7 @@ impl ClientEditDoc {
pub async fn replace<T: ToString>(&mut self, interval: Interval, data: T) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Replace {
let msg = DocumentMsg::Replace {
interval,
data: data.to_string(),
ret,
@ -112,35 +112,35 @@ impl ClientEditDoc {
pub async fn can_undo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>();
let msg = EditMsg::CanUndo { ret };
let msg = DocumentMsg::CanUndo { ret };
let _ = self.document.send(msg);
rx.await.unwrap_or(false)
}
pub async fn can_redo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>();
let msg = EditMsg::CanRedo { ret };
let msg = DocumentMsg::CanRedo { ret };
let _ = self.document.send(msg);
rx.await.unwrap_or(false)
}
pub async fn undo(&self) -> Result<UndoResult, DocError> {
let (ret, rx) = oneshot::channel::<DocResult<UndoResult>>();
let msg = EditMsg::Undo { ret };
let msg = DocumentMsg::Undo { ret };
let _ = self.document.send(msg);
rx.await.map_err(internal_error)?
}
pub async fn redo(&self) -> Result<UndoResult, DocError> {
let (ret, rx) = oneshot::channel::<DocResult<UndoResult>>();
let msg = EditMsg::Redo { ret };
let msg = DocumentMsg::Redo { ret };
let _ = self.document.send(msg);
rx.await.map_err(internal_error)?
}
pub async fn doc(&self) -> DocResult<Doc> {
let (ret, rx) = oneshot::channel::<DocResult<String>>();
let msg = EditMsg::Doc { ret };
let msg = DocumentMsg::Doc { ret };
let _ = self.document.send(msg);
let data = rx.await.map_err(internal_error)??;
let rev_id = self.rev_manager.rev_id();
@ -166,7 +166,7 @@ impl ClientEditDoc {
pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> {
let delta = Delta::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<DocResult<()>>();
let msg = EditMsg::Delta { delta, ret };
let msg = DocumentMsg::Delta { delta, ret };
let _ = self.document.send(msg);
let _ = rx.await.map_err(internal_error)??;
@ -177,7 +177,7 @@ impl ClientEditDoc {
#[cfg(feature = "flowy_test")]
pub async fn doc_json(&self) -> DocResult<String> {
let (ret, rx) = oneshot::channel::<DocResult<String>>();
let msg = EditMsg::Doc { ret };
let msg = DocumentMsg::Doc { ret };
let _ = self.document.send(msg);
rx.await.map_err(internal_error)?
}
@ -203,7 +203,7 @@ impl ClientEditDoc {
async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> {
// Transform the revision
let (ret, rx) = oneshot::channel::<DocResult<TransformDeltas>>();
let _ = self.document.send(EditMsg::RemoteRevision { bytes, ret });
let _ = self.document.send(DocumentMsg::RemoteRevision { bytes, ret });
let TransformDeltas {
client_prime,
server_prime,
@ -217,7 +217,7 @@ impl ClientEditDoc {
// compose delta
let (ret, rx) = oneshot::channel::<DocResult<()>>();
let msg = EditMsg::Delta {
let msg = DocumentMsg::Delta {
delta: client_prime.clone(),
ret,
};
@ -246,7 +246,7 @@ impl ClientEditDoc {
&self.doc_id,
RevType::Remote,
);
self.ws.send(revision.into());
let _ = self.ws.send(revision.into());
save_document(self.document.clone(), local_rev_id.into()).await;
Ok(())
@ -261,7 +261,7 @@ impl ClientEditDoc {
WsDataType::PullRev => {
let range = RevisionRange::try_from(bytes)?;
let revision = self.rev_manager.construct_revisions(range).await?;
self.ws.send(revision.into());
let _ = self.ws.send(revision.into());
},
WsDataType::NewDocUser => {},
WsDataType::Acked => {
@ -298,9 +298,9 @@ impl WsDocumentHandler for EditDocWsHandler {
}
}
async fn save_document(document: UnboundedSender<EditMsg>, rev_id: RevId) -> DocResult<()> {
async fn save_document(document: UnboundedSender<DocumentMsg>, rev_id: RevId) -> DocResult<()> {
let (ret, rx) = oneshot::channel::<DocResult<()>>();
let _ = document.send(EditMsg::SaveDocument { rev_id, ret });
let _ = document.send(DocumentMsg::SaveDocument { rev_id, ret });
let result = rx.await.map_err(internal_error)?;
result
}
@ -316,9 +316,9 @@ fn spawn_rev_store_actor(
sender
}
fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc<ConnectionPool>) -> UnboundedSender<EditMsg> {
let (sender, receiver) = mpsc::unbounded_channel::<EditMsg>();
let actor = DocumentEditActor::new(&doc_id, delta, pool.clone(), receiver);
fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc<ConnectionPool>) -> UnboundedSender<DocumentMsg> {
let (sender, receiver) = mpsc::unbounded_channel::<DocumentMsg>();
let actor = DocumentActor::new(&doc_id, delta, pool.clone(), receiver);
tokio::spawn(actor.run());
sender
}

View File

@ -6,7 +6,7 @@ use bytes::Bytes;
use tokio::sync::oneshot;
pub type Ret<T> = oneshot::Sender<DocResult<T>>;
pub enum EditMsg {
pub enum DocumentMsg {
Delta {
delta: Delta,
ret: Ret<()>,

View File

@ -1,4 +1,4 @@
mod edit_actor;
mod doc_actor;
mod edit_doc;
mod message;
mod model;

View File

@ -4,15 +4,15 @@ use lazy_static::lazy_static;
use std::{convert::TryInto, sync::RwLock};
lazy_static! {
static ref R2F_STREAM_SENDER: RwLock<RustStreamSender> = RwLock::new(RustStreamSender::new());
static ref DART_STREAM_SENDER: RwLock<DartStreamSender> = RwLock::new(DartStreamSender::new());
}
pub struct RustStreamSender {
pub struct DartStreamSender {
#[allow(dead_code)]
isolate: Option<allo_isolate::Isolate>,
}
impl RustStreamSender {
impl DartStreamSender {
fn new() -> Self { Self { isolate: None } }
fn inner_set_port(&mut self, port: i64) {
@ -33,7 +33,7 @@ impl RustStreamSender {
}
pub fn set_port(port: i64) {
match R2F_STREAM_SENDER.write() {
match DART_STREAM_SENDER.write() {
Ok(mut stream) => stream.inner_set_port(port),
Err(e) => {
let msg = format!("Get rust to flutter stream lock fail. {:?}", e);
@ -44,7 +44,7 @@ impl RustStreamSender {
pub fn post(_observable_subject: ObservableSubject) -> Result<(), String> {
#[cfg(feature = "dart")]
match R2F_STREAM_SENDER.read() {
match DART_STREAM_SENDER.read() {
Ok(stream) => stream.inner_post(_observable_subject),
Err(e) => Err(format!("Get rust to flutter stream lock fail. {:?}", e)),
}

View File

@ -4,10 +4,10 @@ pub mod dart;
pub mod entities;
mod protobuf;
use crate::{dart::RustStreamSender, entities::ObservableSubject};
use crate::{dart::DartStreamSender, entities::ObservableSubject};
use flowy_dispatch::prelude::ToBytes;
pub struct NotifyBuilder {
pub struct DartNotifyBuilder {
id: String,
payload: Option<Bytes>,
error: Option<Bytes>,
@ -15,7 +15,7 @@ pub struct NotifyBuilder {
ty: i32,
}
impl NotifyBuilder {
impl DartNotifyBuilder {
pub fn new<T: Into<i32>>(id: &str, ty: T, source: &str) -> Self {
Self {
id: id.to_owned(),
@ -73,7 +73,7 @@ impl NotifyBuilder {
};
log::debug!("Notify {}", subject);
match RustStreamSender::post(subject) {
match DartStreamSender::post(subject) {
Ok(_) => {},
Err(error) => log::error!("Send observable subject failed: {}", error),
}

View File

@ -1,3 +1,3 @@
proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/observable"]
proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"]
event_files = ["src/event.rs"]

View File

@ -5,7 +5,7 @@ pub mod entities;
pub mod errors;
pub mod event;
pub mod module;
mod observable;
mod notify;
pub mod protobuf;
pub mod services;

View File

@ -1,6 +1,6 @@
use flowy_derive::ProtoBuf_Enum;
use flowy_observable::NotifyBuilder;
use flowy_observable::DartNotifyBuilder;
const OBSERVABLE_CATEGORY: &'static str = "User";
@ -20,4 +20,6 @@ impl std::convert::Into<i32> for UserObservable {
fn into(self) -> i32 { self as i32 }
}
pub(crate) fn notify(id: &str, ty: UserObservable) -> NotifyBuilder { NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) }
pub(crate) fn dart_notify(id: &str, ty: UserObservable) -> DartNotifyBuilder {
DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
}

View File

@ -52,7 +52,7 @@ impl UserServerAPI for UserServer {
fn ws_addr(&self) -> String { self.config.ws_addr() }
}
use crate::{errors::ErrorCode, observable::*};
use crate::{errors::ErrorCode, notify::*};
use flowy_net::response::FlowyResponse;
use lazy_static::lazy_static;
use std::sync::Arc;
@ -70,7 +70,7 @@ impl ResponseMiddleware for Middleware {
None => {},
Some(token) => {
let error = UserError::new(ErrorCode::UserUnauthorized, "");
notify(token, UserObservable::UserUnauthorized).error(error).send()
dart_notify(token, UserObservable::UserUnauthorized).error(error).send()
},
}
}

View File

@ -6,7 +6,7 @@ use crate::{
};
use crate::{
observable::*,
notify::*,
services::server::{construct_user_server, Server},
};
use flowy_database::{
@ -194,12 +194,12 @@ impl UserSession {
tokio::spawn(async move {
match server.get_user(&token).await {
Ok(profile) => {
notify(&token, UserObservable::UserProfileUpdated)
dart_notify(&token, UserObservable::UserProfileUpdated)
.payload(profile)
.send();
},
Err(e) => {
notify(&token, UserObservable::UserProfileUpdated).error(e).send();
dart_notify(&token, UserObservable::UserProfileUpdated).error(e).send();
},
}
});

View File

@ -1,3 +1,3 @@
proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/observable"]
proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"]
event_files = ["src/event.rs"]

View File

@ -1,5 +1,5 @@
mod handlers;
mod observable;
mod notify;
mod services;
mod sql_tables;

View File

@ -1,5 +1,5 @@
use flowy_derive::ProtoBuf_Enum;
use flowy_observable::NotifyBuilder;
use flowy_observable::DartNotifyBuilder;
const OBSERVABLE_CATEGORY: &'static str = "Workspace";
#[derive(ProtoBuf_Enum, Debug)]
@ -26,6 +26,6 @@ impl std::convert::Into<i32> for WorkspaceObservable {
fn into(self) -> i32 { self as i32 }
}
pub(crate) fn notify(id: &str, ty: WorkspaceObservable) -> NotifyBuilder {
NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
pub(crate) fn dart_notify(id: &str, ty: WorkspaceObservable) -> DartNotifyBuilder {
DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
}

View File

@ -2,7 +2,7 @@ use crate::{
entities::app::{App, CreateAppParams, *},
errors::*,
module::{WorkspaceDatabase, WorkspaceUser},
observable::*,
notify::*,
services::{helper::spawn, server::Server},
sql_tables::app::{AppTable, AppTableChangeset, AppTableSql},
};
@ -36,7 +36,7 @@ impl AppController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.save_app(app.clone(), &*conn)?;
let apps = self.read_local_apps(&app.workspace_id, &*conn)?;
notify(&app.workspace_id, WorkspaceObservable::WorkspaceCreateApp)
dart_notify(&app.workspace_id, WorkspaceObservable::WorkspaceCreateApp)
.payload(apps)
.send();
Ok(())
@ -64,7 +64,7 @@ impl AppController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let app = self.sql.delete_app(app_id, &*conn)?;
let apps = self.read_local_apps(&app.workspace_id, &*conn)?;
notify(&app.workspace_id, WorkspaceObservable::WorkspaceDeleteApp)
dart_notify(&app.workspace_id, WorkspaceObservable::WorkspaceDeleteApp)
.payload(apps)
.send();
Ok(())
@ -87,7 +87,9 @@ impl AppController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.sql.update_app(changeset, conn)?;
let app: App = self.sql.read_app(&app_id, None, conn)?.into();
notify(&app_id, WorkspaceObservable::AppUpdated).payload(app).send();
dart_notify(&app_id, WorkspaceObservable::AppUpdated)
.payload(app)
.send();
Ok(())
})?;

View File

@ -6,7 +6,7 @@ lazy_static! {
use crate::{
errors::{ErrorCode, WorkspaceError},
observable::*,
notify::*,
};
use flowy_net::{request::ResponseMiddleware, response::FlowyResponse};
@ -21,7 +21,9 @@ impl ResponseMiddleware for WorkspaceMiddleware {
None => {},
Some(token) => {
let error = WorkspaceError::new(ErrorCode::UserUnauthorized, "");
notify(token, WorkspaceObservable::UserUnauthorized).error(error).send()
dart_notify(token, WorkspaceObservable::UserUnauthorized)
.error(error)
.send()
},
}
}

View File

@ -2,7 +2,7 @@ use crate::{
entities::view::{CreateViewParams, UpdateViewParams, View},
errors::WorkspaceError,
module::WorkspaceDatabase,
observable::notify,
notify::dart_notify,
services::{helper::spawn, server::Server},
sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
};
@ -11,7 +11,7 @@ use crate::{
entities::view::{DeleteViewParams, QueryViewParams, RepeatedView},
errors::internal_error,
module::WorkspaceUser,
observable::WorkspaceObservable,
notify::WorkspaceObservable,
};
use flowy_database::SqliteConnection;
use flowy_document::{
@ -55,7 +55,7 @@ impl ViewController {
.create(CreateDocParams::new(&view.id, params.data), conn)?;
let repeated_view = self.read_local_views_belong_to(&view.belong_to_id, conn)?;
notify(&view.belong_to_id, WorkspaceObservable::AppCreateView)
dart_notify(&view.belong_to_id, WorkspaceObservable::AppCreateView)
.payload(repeated_view)
.send();
Ok(())
@ -93,7 +93,7 @@ impl ViewController {
let _ = self.document.delete(params.into(), conn)?;
let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id, conn)?;
notify(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView)
dart_notify(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView)
.payload(repeated_view)
.send();
Ok(())
@ -119,7 +119,9 @@ impl ViewController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.sql.update_view(changeset, conn)?;
let view: View = self.sql.read_view(&view_id, None, conn)?.into();
notify(&view_id, WorkspaceObservable::ViewUpdated).payload(view).send();
dart_notify(&view_id, WorkspaceObservable::ViewUpdated)
.payload(view)
.send();
Ok(())
})?;

View File

@ -5,7 +5,7 @@ use crate::{
},
errors::*,
module::{WorkspaceDatabase, WorkspaceUser},
observable::*,
notify::*,
services::{helper::spawn, server::Server, AppController, ViewController},
sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
};
@ -61,7 +61,7 @@ impl WorkspaceController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
self.workspace_sql.create_workspace(workspace_table, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
notify(&token, WorkspaceObservable::UserCreateWorkspace)
dart_notify(&token, WorkspaceObservable::UserCreateWorkspace)
.payload(repeated_workspace)
.send();
@ -80,7 +80,7 @@ impl WorkspaceController {
let _ = self.workspace_sql.update_workspace(changeset, conn)?;
let user_id = self.user.user_id()?;
let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
notify(&workspace_id, WorkspaceObservable::WorkspaceUpdated)
dart_notify(&workspace_id, WorkspaceObservable::WorkspaceUpdated)
.payload(workspace)
.send();
@ -100,7 +100,7 @@ impl WorkspaceController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
notify(&token, WorkspaceObservable::UserDeleteWorkspace)
dart_notify(&token, WorkspaceObservable::UserDeleteWorkspace)
.payload(repeated_workspace)
.send();
@ -289,7 +289,7 @@ impl WorkspaceController {
Ok(())
})?;
notify(&token, WorkspaceObservable::WorkspaceListUpdated)
dart_notify(&token, WorkspaceObservable::WorkspaceListUpdated)
.payload(workspaces)
.send();
Result::<(), WorkspaceError>::Ok(())