use crate::{errors::ServerError, response::FlowyResponse}; use bytes::Bytes; use hyper::http; use protobuf::ProtobufError; use reqwest::{Client, Method, Response}; use std::{ convert::{TryFrom, TryInto}, time::Duration, }; use tokio::sync::oneshot; pub struct HttpRequestBuilder { url: String, body: Option, response: Option, method: Method, } impl HttpRequestBuilder { fn new(url: &str) -> Self { Self { url: url.to_owned(), body: None, response: None, method: Method::GET, } } pub fn get(url: &str) -> Self { let mut builder = Self::new(url); builder.method = Method::GET; builder } pub fn post(url: &str) -> Self { let mut builder = Self::new(url); builder.method = Method::POST; builder } pub fn patch(url: &str) -> Self { let mut builder = Self::new(url); builder.method = Method::PATCH; builder } pub fn delete(url: &str) -> Self { let mut builder = Self::new(url); builder.method = Method::DELETE; builder } pub fn protobuf(mut self, body: T1) -> Result where T1: TryInto, { let body: Bytes = body.try_into()?; self.body = Some(body); Ok(self) } pub async fn send(mut self) -> Result { let (tx, rx) = oneshot::channel::>(); let url = self.url.clone(); let body = self.body.take(); let method = self.method.clone(); // reqwest client is not 'Sync' by channel is. tokio::spawn(async move { let client = default_client(); let mut builder = client.request(method, url); if let Some(body) = body { builder = builder.body(body); } let response = builder.send().await; tx.send(response); }); let response = rx.await??; self.response = Some(response); Ok(self) } pub async fn response(self) -> Result where T2: TryFrom, { match self.response { None => { let msg = format!("Request: {} receives unexpected empty body", self.url); Err(ServerError::payload_none().context(msg)) }, Some(response) => { let data = get_response_data(response).await?; Ok(T2::try_from(data)?) }, } } } #[allow(dead_code)] pub async fn http_post(url: &str, data: T1) -> Result where T1: TryInto, T2: TryFrom, { let body: Bytes = data.try_into()?; let url = url.to_owned(); let (tx, rx) = oneshot::channel::>(); // reqwest client is not 'Sync' by channel is. tokio::spawn(async move { let client = default_client(); let response = client.post(&url).body(body).send().await; tx.send(response); }); let response = rx.await??; let data = get_response_data(response).await?; Ok(T2::try_from(data)?) } async fn get_response_data(original: Response) -> Result { if original.status() == http::StatusCode::OK { let bytes = original.bytes().await?; let response: FlowyResponse = serde_json::from_slice(&bytes)?; match response.error { None => Ok(response.data), Some(error) => Err(error), } } else { Err(ServerError::http().context(original)) } } fn default_client() -> Client { let result = reqwest::Client::builder() .connect_timeout(Duration::from_millis(500)) .timeout(Duration::from_secs(5)) .build(); match result { Ok(client) => client, Err(e) => { log::error!("Create reqwest client failed: {}", e); reqwest::Client::new() }, } }