diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 1456d14408..c8214fc942 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -6,14 +6,17 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix = "0.10" -actix-web = "3" -actix-http = "2.2.1" -actix-web-actors = "3" +actix = "0.12" +#actix-web = "3" +#actix-http = "2.2.1" +#actix-web-actors = "3" actix-codec = "0.3" +actix-web = "4.0.0-beta.8" +actix-http = "3.0.0-beta.8" +actix-web-actors = { version = "4.0.0-beta.6" } futures = "0.3.15" -bytes = "0.5" +bytes = "1" toml = "0.5.8" dashmap = "4.0" log = "0.4.14" @@ -22,12 +25,13 @@ serde = { version = "1.0", features = ["derive"] } serde_repr = "0.1" derive_more = {version = "0.99", features = ["display"]} protobuf = {version = "2.20.0"} + flowy-log = { path = "../rust-lib/flowy-log" } flowy-user = { path = "../rust-lib/flowy-user" } flowy-net = { path = "../rust-lib/flowy-net", features = ["http"] } [dependencies.sqlx] -version = "0.5.2" +version = "0.5.6" default-features = false features = [ "runtime-actix-rustls", diff --git a/backend/src/routers/ws.rs b/backend/src/routers/ws.rs index 199d8d3d25..5c7e59a34d 100644 --- a/backend/src/routers/ws.rs +++ b/backend/src/routers/ws.rs @@ -14,10 +14,10 @@ use actix_web_actors::ws; pub async fn start_connection( request: HttpRequest, payload: Payload, - Path(token): Path, + path: Path, server: Data>, ) -> Result { - let client = WSClient::new(SessionId::new(token), server.get_ref().clone()); + let client = WSClient::new(SessionId::new(path.clone()), server.get_ref().clone()); let result = ws::start(client, &request, payload); match result { diff --git a/backend/src/ws_service/ws_client.rs b/backend/src/ws_service/ws_client.rs index 8c0ac23577..a5e66f821c 100644 --- a/backend/src/ws_service/ws_client.rs +++ b/backend/src/ws_service/ws_client.rs @@ -12,6 +12,7 @@ use actix::{ Actor, ActorContext, ActorFuture, + ActorFutureExt, Addr, AsyncContext, ContextFutureSpawner, @@ -123,13 +124,13 @@ impl StreamHandler> for WSClient { }, Ok(Text(s)) => { log::debug!("Receive {} text {:?}", &self.sid, &s); - self.send(MessageData::Text(s)); + self.send(MessageData::Text(s.to_string())); }, Err(e) => { let msg = format!("{} error: {:?}", &self.sid, e); - ctx.text(&msg); log::error!("stream {}", msg); + ctx.text(msg); ctx.stop(); }, } diff --git a/rust-lib/flowy-derive/src/proto_buf/deserialize.rs b/rust-lib/flowy-derive/src/proto_buf/deserialize.rs index 5bcc19b80d..a490da261d 100644 --- a/rust-lib/flowy-derive/src/proto_buf/deserialize.rs +++ b/rust-lib/flowy-derive/src/proto_buf/deserialize.rs @@ -23,20 +23,15 @@ pub fn make_de_token_steam(ctxt: &Ctxt, ast: &ASTContainer) -> Option for #struct_ident { - type Error = String; + type Error = ::protobuf::ProtobufError; fn try_from(bytes: &bytes::Bytes) -> Result { - let result: ::protobuf::ProtobufResult = ::protobuf::Message::parse_from_bytes(&bytes); - match result { - Ok(mut pb) => { - #struct_ident::try_from(&mut pb) - } - Err(e) => Err(format!("{:?}", e)), - } + let mut pb: crate::protobuf::#pb_ty = ::protobuf::Message::parse_from_bytes(&bytes)?; + #struct_ident::try_from(&mut pb) } } impl std::convert::TryFrom<&mut crate::protobuf::#pb_ty> for #struct_ident { - type Error = String; + type Error = ::protobuf::ProtobufError; fn try_from(pb: &mut crate::protobuf::#pb_ty) -> Result { let mut o = Self::default(); #(#build_take_fields)* diff --git a/rust-lib/flowy-derive/src/proto_buf/serialize.rs b/rust-lib/flowy-derive/src/proto_buf/serialize.rs index ca85b480c8..20de80496c 100644 --- a/rust-lib/flowy-derive/src/proto_buf/serialize.rs +++ b/rust-lib/flowy-derive/src/proto_buf/serialize.rs @@ -18,20 +18,17 @@ pub fn make_se_token_stream(ctxt: &Ctxt, ast: &ASTContainer) -> Option for #struct_ident { - type Error = String; + type Error = ::protobuf::ProtobufError; fn try_into(self) -> Result { use protobuf::Message; let pb: crate::protobuf::#pb_ty = self.try_into()?; - let result: ::protobuf::ProtobufResult> = pb.write_to_bytes(); - match result { - Ok(bytes) => { Ok(bytes::Bytes::from(bytes)) }, - Err(e) => { Err(format!("{:?}", e)) } - } + let bytes = pb.write_to_bytes()?; + Ok(bytes::Bytes::from(bytes)) } } impl std::convert::TryInto for #struct_ident { - type Error = String; + type Error = ::protobuf::ProtobufError; fn try_into(self) -> Result { let mut pb = crate::protobuf::#pb_ty::new(); #(#build_set_pb_fields)* diff --git a/rust-lib/flowy-dispatch/src/byte_trait.rs b/rust-lib/flowy-dispatch/src/byte_trait.rs index 084290f5c2..1c427f846e 100644 --- a/rust-lib/flowy-dispatch/src/byte_trait.rs +++ b/rust-lib/flowy-dispatch/src/byte_trait.rs @@ -1,16 +1,29 @@ +use crate::errors::{DispatchError, InternalError}; use bytes::Bytes; +use protobuf::ProtobufError; // To bytes pub trait ToBytes { - fn into_bytes(self) -> Result; + fn into_bytes(self) -> Result; } #[cfg(feature = "use_protobuf")] impl ToBytes for T where - T: std::convert::TryInto, + T: std::convert::TryInto, { - fn into_bytes(self) -> Result { self.try_into() } + fn into_bytes(self) -> Result { + match self.try_into() { + Ok(data) => Ok(data), + Err(e) => { + // let system_err: DispatchError = InternalError::new(format!("{:?}", + // e)).into(); system_err.into() + // Err(format!("{:?}", e)) + + Err(InternalError::ProtobufError(format!("{:?}", e)).into()) + }, + } + } } #[cfg(feature = "use_serde")] @@ -18,10 +31,10 @@ impl ToBytes for T where T: serde::Serialize, { - fn into_bytes(self) -> Result { + fn into_bytes(self) -> Result { match serde_json::to_string(&self.0) { Ok(s) => Ok(Bytes::from(s)), - Err(e) => Err(format!("{:?}", e)), + Err(e) => Err(InternalError::SerializeToBytes(format!("{:?}", e)).into()), } } } @@ -29,16 +42,19 @@ where // From bytes pub trait FromBytes: Sized { - fn parse_from_bytes(bytes: Bytes) -> Result; + fn parse_from_bytes(bytes: Bytes) -> Result; } #[cfg(feature = "use_protobuf")] impl FromBytes for T where // https://stackoverflow.com/questions/62871045/tryfromu8-trait-bound-in-trait - T: for<'a> std::convert::TryFrom<&'a Bytes, Error = String>, + T: for<'a> std::convert::TryFrom<&'a Bytes, Error = protobuf::ProtobufError>, { - fn parse_from_bytes(bytes: Bytes) -> Result { T::try_from(&bytes) } + fn parse_from_bytes(bytes: Bytes) -> Result { + let data = T::try_from(&bytes)?; + Ok(data) + } } #[cfg(feature = "use_serde")] diff --git a/rust-lib/flowy-dispatch/src/data.rs b/rust-lib/flowy-dispatch/src/data.rs index 4265c56ba9..523d03931d 100644 --- a/rust-lib/flowy-dispatch/src/data.rs +++ b/rust-lib/flowy-dispatch/src/data.rs @@ -37,7 +37,9 @@ where Payload::None => ready(Err(unexpected_none_payload(req))), Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) { Ok(data) => ready(Ok(Data(data))), - Err(e) => ready(Err(InternalError::new(format!("{}", e)).into())), + Err(e) => ready(Err( + InternalError::DeserializeFromBytes(format!("{}", e)).into() + )), }, } } @@ -50,10 +52,7 @@ where fn respond_to(self, _request: &EventRequest) -> EventResponse { match self.into_inner().into_bytes() { Ok(bytes) => ResponseBuilder::Ok().data(bytes).build(), - Err(e) => { - let system_err: DispatchError = InternalError::new(format!("{}", e)).into(); - system_err.into() - }, + Err(e) => e.into(), } } } @@ -62,7 +61,7 @@ impl std::convert::TryFrom<&Payload> for Data where T: FromBytes, { - type Error = String; + type Error = DispatchError; fn try_from(payload: &Payload) -> Result, Self::Error> { parse_payload(payload) } } @@ -70,19 +69,21 @@ impl std::convert::TryFrom for Data where T: FromBytes, { - type Error = String; + type Error = DispatchError; fn try_from(payload: Payload) -> Result, Self::Error> { parse_payload(&payload) } } -fn parse_payload(payload: &Payload) -> Result, String> +fn parse_payload(payload: &Payload) -> Result, DispatchError> where T: FromBytes, { match payload { - Payload::None => Err(format!("Parse fail, expected payload")), - Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) { - Ok(data) => Ok(Data(data)), - Err(e) => Err(e), + Payload::None => { + Err(InternalError::UnexpectedNone(format!("Parse fail, expected payload")).into()) + }, + Payload::Bytes(bytes) => { + let data = T::parse_from_bytes(bytes.clone())?; + Ok(Data(data)) }, } } @@ -91,7 +92,7 @@ impl std::convert::TryInto for Data where T: ToBytes, { - type Error = String; + type Error = DispatchError; fn try_into(self) -> Result { let inner = self.into_inner(); @@ -101,5 +102,5 @@ where } impl ToBytes for Data { - fn into_bytes(self) -> Result { Ok(Bytes::from(self.0)) } + fn into_bytes(self) -> Result { Ok(Bytes::from(self.0)) } } diff --git a/rust-lib/flowy-dispatch/src/dispatch.rs b/rust-lib/flowy-dispatch/src/dispatch.rs index e8badd4b9c..8f474d0741 100644 --- a/rust-lib/flowy-dispatch/src/dispatch.rs +++ b/rust-lib/flowy-dispatch/src/dispatch.rs @@ -68,14 +68,17 @@ impl EventDispatch { service .call(service_ctx) .await - .unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response()) + .unwrap_or_else(|e| InternalError::Other(format!("{:?}", e)).as_response()) }); DispatchFuture { fut: Box::pin(async move { join_handle.await.unwrap_or_else(|e| { - InternalError::new(format!("EVENT_DISPATCH join error: {:?}", e)) - .as_response() + let error = InternalError::JoinError(format!( + "EVENT_DISPATCH join error: {:?}", + e + )); + error.as_response() }) }), } @@ -83,9 +86,8 @@ impl EventDispatch { Err(e) => { let msg = format!("EVENT_DISPATCH read failed. {:?}", e); - log::error!("{}", msg); DispatchFuture { - fut: Box::pin(async { InternalError::new(msg).as_response() }), + fut: Box::pin(async { InternalError::Lock(msg).as_response() }), } }, } @@ -165,7 +167,7 @@ impl Service for DispatchService { None => { let msg = format!("Can not find the event handler. {:?}", request); log::error!("{}", msg); - Err(InternalError::new(msg).into()) + Err(InternalError::HandleNotFound(msg).into()) }, } }; diff --git a/rust-lib/flowy-dispatch/src/errors/errors.rs b/rust-lib/flowy-dispatch/src/errors/errors.rs index b0ad881153..34bd3a6d45 100644 --- a/rust-lib/flowy-dispatch/src/errors/errors.rs +++ b/rust-lib/flowy-dispatch/src/errors/errors.rs @@ -5,9 +5,10 @@ use crate::{ }; use bytes::Bytes; use dyn_clone::DynClone; +use protobuf::ProtobufError; use serde::{Serialize, Serializer}; use std::{fmt, option::NoneError}; -use tokio::sync::mpsc::error::SendError; +use tokio::{sync::mpsc::error::SendError, task::JoinError}; pub trait Error: fmt::Debug + DynClone + Send + Sync { fn as_response(&self) -> EventResponse; @@ -48,30 +49,31 @@ impl std::error::Error for DispatchError { impl From> for DispatchError { fn from(err: SendError) -> Self { - InternalError { - inner: format!("{}", err), - } - .into() + InternalError::Other(format!("{}", err)).into() } } impl From for DispatchError { fn from(s: NoneError) -> Self { - InternalError { - inner: format!("Unexpected none: {:?}", s), - } - .into() + InternalError::UnexpectedNone(format!("Unexpected none: {:?}", s)).into() } } impl From for DispatchError { - fn from(s: String) -> Self { InternalError { inner: s }.into() } + fn from(s: String) -> Self { InternalError::Other(s).into() } +} + +#[cfg(feature = "use_protobuf")] +impl From for DispatchError { + fn from(e: protobuf::ProtobufError) -> Self { + InternalError::ProtobufError(format!("{:?}", e)).into() + } } impl FromBytes for DispatchError { - fn parse_from_bytes(bytes: Bytes) -> Result { + fn parse_from_bytes(bytes: Bytes) -> Result { let s = String::from_utf8(bytes.to_vec()).unwrap(); - Ok(InternalError { inner: s }.into()) + Ok(InternalError::DeserializeFromBytes(s).into()) } } @@ -79,39 +81,6 @@ impl From for EventResponse { fn from(err: DispatchError) -> Self { err.inner_error().as_response() } } -#[derive(Clone)] -pub(crate) struct InternalError { - inner: T, -} - -impl InternalError { - pub fn new(inner: T) -> Self { InternalError { inner } } -} - -impl fmt::Debug for InternalError -where - T: fmt::Debug + 'static + Clone + Send + Sync, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.inner, f) } -} - -impl fmt::Display for InternalError -where - T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.inner, f) } -} - -impl Error for InternalError -where - T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync, -{ - fn as_response(&self) -> EventResponse { - let error = format!("{}", self.inner).into_bytes(); - ResponseBuilder::Err().data(error).build() - } -} - impl Serialize for DispatchError { fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> where @@ -120,3 +89,43 @@ impl Serialize for DispatchError { serializer.serialize_str(&format!("{}", self)) } } + +#[derive(Clone, Debug)] +pub(crate) enum InternalError { + ProtobufError(String), + UnexpectedNone(String), + DeserializeFromBytes(String), + SerializeToBytes(String), + JoinError(String), + Lock(String), + ServiceNotFound(String), + HandleNotFound(String), + Other(String), +} + +impl fmt::Display for InternalError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + InternalError::ProtobufError(s) => fmt::Display::fmt(&s, f), + InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f), + InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f), + InternalError::SerializeToBytes(s) => fmt::Display::fmt(&s, f), + InternalError::JoinError(s) => fmt::Display::fmt(&s, f), + InternalError::Lock(s) => fmt::Display::fmt(&s, f), + InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f), + InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f), + InternalError::Other(s) => fmt::Display::fmt(&s, f), + } + } +} + +impl Error for InternalError { + fn as_response(&self) -> EventResponse { + let error = format!("{}", self).into_bytes(); + ResponseBuilder::Err().data(error).build() + } +} + +impl std::convert::From for InternalError { + fn from(e: JoinError) -> Self { InternalError::JoinError(format!("{}", e)) } +} diff --git a/rust-lib/flowy-dispatch/src/module/data.rs b/rust-lib/flowy-dispatch/src/module/data.rs index 368126cd28..eac2cd3f12 100644 --- a/rust-lib/flowy-dispatch/src/module/data.rs +++ b/rust-lib/flowy-dispatch/src/module/data.rs @@ -56,7 +56,7 @@ where type_name::() ); log::error!("{}", msg,); - ready(Err(InternalError::new(msg).into())) + ready(Err(InternalError::Other(msg).into())) } } } diff --git a/rust-lib/flowy-dispatch/src/module/module.rs b/rust-lib/flowy-dispatch/src/module/module.rs index ffe79c11bf..e0893be7e5 100644 --- a/rust-lib/flowy-dispatch/src/module/module.rs +++ b/rust-lib/flowy-dispatch/src/module/module.rs @@ -192,7 +192,13 @@ impl Service for ModuleService { }; Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) }, - None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }), + None => { + let msg = format!( + "Can not find service factory for event: {:?}", + request.event + ); + Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) }) + }, } } } diff --git a/rust-lib/flowy-dispatch/src/request/request.rs b/rust-lib/flowy-dispatch/src/request/request.rs index 0d0b2c93ee..edcfc3fe36 100644 --- a/rust-lib/flowy-dispatch/src/request/request.rs +++ b/rust-lib/flowy-dispatch/src/request/request.rs @@ -77,7 +77,7 @@ impl FromRequest for String { pub fn unexpected_none_payload(request: &EventRequest) -> DispatchError { log::warn!("{:?} expected payload", &request.event); - InternalError::new("Expected payload").into() + InternalError::UnexpectedNone("Expected payload".to_string()).into() } #[doc(hidden)] diff --git a/rust-lib/flowy-dispatch/src/response/response.rs b/rust-lib/flowy-dispatch/src/response/response.rs index 32356472f1..95f26c047c 100644 --- a/rust-lib/flowy-dispatch/src/response/response.rs +++ b/rust-lib/flowy-dispatch/src/response/response.rs @@ -36,15 +36,11 @@ impl EventResponse { E: FromBytes, { if self.status_code == StatusCode::Err { - match >::try_from(self.payload) { - Ok(err) => Ok(Err(err.into_inner())), - Err(e) => Err(InternalError::new(e).into()), - } + let err = >::try_from(self.payload)?; + Ok(Err(err.into_inner())) } else { - match >::try_from(self.payload) { - Ok(a) => Ok(Ok(a.into_inner())), - Err(e) => Err(InternalError::new(e).into()), - } + let data = >::try_from(self.payload)?; + Ok(Ok(data.into_inner())) } } } diff --git a/rust-lib/flowy-net/Cargo.toml b/rust-lib/flowy-net/Cargo.toml index 65ccf20b5b..ed43e3e68e 100644 --- a/rust-lib/flowy-net/Cargo.toml +++ b/rust-lib/flowy-net/Cargo.toml @@ -11,12 +11,14 @@ protobuf = {version = "2.18.0"} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_repr = "0.1" -actix-web = {version = "3", optional = true} pin-project = "1.0.0" futures-core = { version = "0.3", default-features = false } log = "0.4" bytes = "1.0" lazy_static = "1.4.0" +tokio = { version = "1", features = ["full"] } + +actix-web = {version = "4.0.0-beta.8", optional = true} [features] http = ["actix-web"] \ No newline at end of file diff --git a/rust-lib/flowy-net/src/config.rs b/rust-lib/flowy-net/src/config.rs index 8c529766f3..8440c3996b 100644 --- a/rust-lib/flowy-net/src/config.rs +++ b/rust-lib/flowy-net/src/config.rs @@ -1,6 +1,6 @@ use lazy_static::lazy_static; -pub const HOST: &'static str = "0.0.0.0:3030"; +pub const HOST: &'static str = "http://0.0.0.0:3030"; lazy_static! { pub static ref SIGN_UP_URL: String = format!("{}/user/register", HOST); diff --git a/rust-lib/flowy-net/src/errors.rs b/rust-lib/flowy-net/src/errors.rs index 322e19bdc1..c5d641175b 100644 --- a/rust-lib/flowy-net/src/errors.rs +++ b/rust-lib/flowy-net/src/errors.rs @@ -1,6 +1,5 @@ use crate::response::FlowyResponse; use protobuf::ProtobufError; - use std::fmt::{Formatter, Write}; #[derive(Debug)] @@ -36,3 +35,7 @@ impl std::convert::From for NetworkError { NetworkError::InternalError(msg) } } + +impl std::convert::From for NetworkError { + fn from(error: String) -> Self { NetworkError::InternalError(error) } +} diff --git a/rust-lib/flowy-net/src/request/request.rs b/rust-lib/flowy-net/src/request/request.rs index e91c965cfa..e4959062c3 100644 --- a/rust-lib/flowy-net/src/request/request.rs +++ b/rust-lib/flowy-net/src/request/request.rs @@ -1,47 +1,46 @@ -use crate::errors::NetworkError; +use crate::{errors::NetworkError, future::ResultFuture}; use bytes::Bytes; -use protobuf::Message; -use reqwest::{Client, Response}; -use std::{convert::TryFrom, time::Duration}; +use protobuf::{Message, ProtobufError}; +use reqwest::{Client, Error, Response}; +use std::{ + convert::{TryFrom, TryInto}, + time::Duration, +}; +use tokio::sync::{oneshot, oneshot::error::RecvError}; -pub struct FlowyRequest { - client: Client, +pub async fn http_post(url: &str, data: T1) -> ResultFuture +where + T1: TryInto + Send + Sync + 'static, + T2: TryFrom + Send + Sync + 'static, +{ + let url = url.to_owned(); + ResultFuture::new(async move { post(url, data).await }) } -impl FlowyRequest { - pub fn new() -> Self { +pub async fn post(url: String, data: T1) -> Result +where + T1: TryInto, + T2: TryFrom, +{ + let request_bytes: Bytes = data.try_into()?; + let (tx, rx) = oneshot::channel::>(); + + tokio::spawn(async move { let client = default_client(); - Self { client } - } + let response = client.post(&url).body(request_bytes).send().await; + tx.send(response); + }); - pub async fn get(&self, url: &str) -> Result - where - T: Message, - { - let url = url.to_owned(); - let response = self.client.get(&url).send().await?; - parse_response(response).await - } - - pub async fn post(&self, url: &str, data: T) -> Result - where - T: Message, - { - let url = url.to_owned(); - let body = data.write_to_bytes()?; - let response = self.client.post(&url).body(body).send().await?; - parse_response(response).await - } - - pub async fn post_data(&self, url: &str, bytes: Vec) -> Result - where - T: for<'a> TryFrom<&'a Vec>, - { - let url = url.to_owned(); - let response = self.client.post(&url).body(bytes).send().await?; - let bytes = response.bytes().await?.to_vec(); - let data = T::try_from(&bytes).map_err(|_e| panic!("")).unwrap(); - Ok(data) + match rx.await { + Ok(response) => { + let response = response?; + let response_bytes = response.bytes().await?; + let data = T2::try_from(response_bytes)?; + Ok(data) + }, + Err(e) => { + unimplemented!() + }, } } diff --git a/rust-lib/flowy-net/src/response/response_http.rs b/rust-lib/flowy-net/src/response/response_http.rs index e357f5b848..35c270d58d 100644 --- a/rust-lib/flowy-net/src/response/response_http.rs +++ b/rust-lib/flowy-net/src/response/response_http.rs @@ -1,9 +1,9 @@ use crate::{errors::NetworkError, response::*}; -use actix_web::{body::Body, error::ResponseError, HttpResponse}; +use actix_web::{body::Body, error::ResponseError, BaseHttpResponse, HttpResponse}; use serde::Serialize; -impl ResponseError for NetworkError { - fn error_response(&self) -> HttpResponse { +impl NetworkError { + fn http_response(&self) -> HttpResponse { match self { NetworkError::InternalError(msg) => { let resp = FlowyResponse::from_msg(&msg, ServerCode::InternalError); @@ -18,6 +18,10 @@ impl ResponseError for NetworkError { } } +impl ResponseError for NetworkError { + fn error_response(&self) -> HttpResponse { self.http_response().into() } +} + impl std::convert::Into for FlowyResponse { fn into(self) -> HttpResponse { match serde_json::to_string(&self) { diff --git a/rust-lib/flowy-test/src/tester.rs b/rust-lib/flowy-test/src/tester.rs index 9fd3e7e310..c882f78030 100644 --- a/rust-lib/flowy-test/src/tester.rs +++ b/rust-lib/flowy-test/src/tester.rs @@ -64,9 +64,15 @@ pub trait TesterTrait { where P: ToBytes, { - let bytes = payload.into_bytes().unwrap(); - let module_request = self.mut_context().request.take().unwrap(); - self.mut_context().request = Some(module_request.payload(bytes)); + match payload.into_bytes() { + Ok(bytes) => { + let module_request = self.mut_context().request.take().unwrap(); + self.mut_context().request = Some(module_request.payload(bytes)); + }, + Err(e) => { + log::error!("Set payload failed: {:?}", e); + }, + } } fn sync_send(&mut self) { diff --git a/rust-lib/flowy-user/src/services/user/user_server.rs b/rust-lib/flowy-user/src/services/user/user_server.rs index 089b8f7891..22eb701799 100644 --- a/rust-lib/flowy-user/src/services/user/user_server.rs +++ b/rust-lib/flowy-user/src/services/user/user_server.rs @@ -3,10 +3,11 @@ use crate::{ errors::{ErrorBuilder, UserErrCode, UserError}, }; -use flowy_net::{future::ResultFuture, request::FlowyRequest}; +use bytes::Bytes; +use flowy_net::{config::SIGN_UP_URL, future::ResultFuture, request::http_post}; use std::sync::Arc; -pub(crate) trait UserServer { +pub trait UserServer { fn sign_up(&self, params: SignUpParams) -> ResultFuture; fn sign_in(&self, params: SignInParams) -> ResultFuture; fn sign_out(&self, user_id: &str) -> ResultFuture<(), UserError>; @@ -25,23 +26,8 @@ pub struct UserServerImpl {} impl UserServerImpl {} impl UserServer for UserServerImpl { - fn sign_up(&self, _params: SignUpParams) -> ResultFuture { - // let bytes: Vec = params.try_into().unwrap(); - // ResultFuture::new(async move { - // match FlowyRequest::new() - // .post_data::("SIGN_UP_URL.as_ref()", bytes) - // .await - // { - // Ok(a) => {}, - // Err(err) => {}, - // } - // - // Ok(SignUpResponse { - // uid: "".to_string(), - // name: "".to_string(), - // email: "".to_string(), - // }) - // }) + fn sign_up(&self, params: SignUpParams) -> ResultFuture { + // http_post(SIGN_UP_URL.as_ref(), params) unimplemented!() } diff --git a/rust-lib/flowy-workspace/src/observable/observable.rs b/rust-lib/flowy-workspace/src/observable/observable.rs index a556884403..ae5ff30880 100644 --- a/rust-lib/flowy-workspace/src/observable/observable.rs +++ b/rust-lib/flowy-workspace/src/observable/observable.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use flowy_derive::ProtoBuf_Enum; -use flowy_dispatch::prelude::ToBytes; +use flowy_dispatch::prelude::{DispatchError, ToBytes}; use flowy_observable::{dart::RustStreamSender, entities::ObservableSubject}; const OBSERVABLE_CATEGORY: &'static str = "Workspace"; @@ -47,8 +47,13 @@ impl ObservableSender { where T: ToBytes, { - let bytes = payload.into_bytes().unwrap(); - self.payload = Some(bytes); + match payload.into_bytes() { + Ok(bytes) => self.payload = Some(bytes), + Err(e) => { + log::error!("Set observable payload failed: {:?}", e); + }, + } + self }