diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 878779d574..75727d13eb 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -10,7 +10,7 @@ actix = "0.12" #actix-web = "3" #actix-http = "2.2.1" #actix-web-actors = "3" -actix-codec = "0.3" +actix-codec = "0.4" actix-web = "4.0.0-beta.8" actix-http = "3.0.0-beta.8" actix-rt = "2" diff --git a/rust-lib/dart-ffi/Cargo.toml b/rust-lib/dart-ffi/Cargo.toml index dbaf8eb045..7467da3fa5 100644 --- a/rust-lib/dart-ffi/Cargo.toml +++ b/rust-lib/dart-ffi/Cargo.toml @@ -20,7 +20,8 @@ byteorder = {version = "1.3.4"} ffi-support = {version = "0.4.2"} protobuf = {version = "2.20.0"} lazy_static = {version = "1.4.0"} -tokio = { version = "1", features = ["rt", "rt-multi-thread"] } +#tokio = { version = "1", features = ["rt", "rt-multi-thread"] } +tokio = { version = "1", features = ["full"] } log = "0.4.14" serde = { version = "1.0", features = ["derive"] } serde_json = {version = "1.0"} diff --git a/rust-lib/flowy-dispatch/src/dispatch.rs b/rust-lib/flowy-dispatch/src/dispatch.rs index 15e5549477..8279199b92 100644 --- a/rust-lib/flowy-dispatch/src/dispatch.rs +++ b/rust-lib/flowy-dispatch/src/dispatch.rs @@ -22,10 +22,11 @@ impl EventDispatch { where F: FnOnce() -> Vec, { + let runtime = tokio_default_runtime().unwrap(); let modules = module_factory(); log::trace!("{}", module_info(&modules)); let module_map = as_module_map(modules); - let runtime = tokio_default_runtime().unwrap(); + let dispatch = EventDispatch { module_map, runtime }; dispatch } @@ -74,6 +75,13 @@ impl EventDispatch { pub fn sync_send(dispatch: Arc, request: ModuleRequest) -> EventResponse { futures::executor::block_on(async { EventDispatch::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await }) } + + pub fn spawn(&self, f: F) + where + F: Future + Send + 'static, + { + self.runtime.spawn(f); + } } #[pin_project] diff --git a/rust-lib/flowy-document/Cargo.toml b/rust-lib/flowy-document/Cargo.toml index 0d8f622c4a..a600ef7d5c 100644 --- a/rust-lib/flowy-document/Cargo.toml +++ b/rust-lib/flowy-document/Cargo.toml @@ -23,7 +23,7 @@ protobuf = {version = "2.18.0"} unicode-segmentation = "1.7.1" lazy_static = "1.4.0" log = "0.4.14" -tokio = {version = "1.6.0", features = ["sync"]} +tokio = {version = "1", features = ["sync"]} tracing = { version = "0.1", features = ["log"] } bytes = { version = "1.0" } strum = "0.21" diff --git a/rust-lib/flowy-net/src/config.rs b/rust-lib/flowy-net/src/config.rs index a76198032e..0696108fa2 100644 --- a/rust-lib/flowy-net/src/config.rs +++ b/rust-lib/flowy-net/src/config.rs @@ -14,7 +14,7 @@ lazy_static! { pub static ref WORKSPACE_URL: String = format!("{}/api/workspace", HOST); pub static ref APP_URL: String = format!("{}/api/app", HOST); pub static ref VIEW_URL: String = format!("{}/api/view", HOST); - - // pub static ref DOC_URL: String = format!("{}/api/doc", HOST); + + pub static ref WS_ADDR: String = format!("ws://localhost:8000/ws"); } diff --git a/rust-lib/flowy-sdk/Cargo.toml b/rust-lib/flowy-sdk/Cargo.toml index 5e05e60061..fe5a4d21a8 100644 --- a/rust-lib/flowy-sdk/Cargo.toml +++ b/rust-lib/flowy-sdk/Cargo.toml @@ -13,11 +13,13 @@ flowy-infra = { path = "../flowy-infra" } flowy-workspace = { path = "../flowy-workspace" } flowy-database = { path = "../flowy-database" } flowy-document = { path = "../flowy-document" } +flowy-ws = { path = "../flowy-ws" } tracing = { version = "0.1" } log = "0.4.14" futures-core = { version = "0.3", default-features = false } color-eyre = { version = "0.5", default-features = false } bytes = "1.0" + [dev-dependencies] serde = { version = "1.0", features = ["derive"] } bincode = { version = "1.3"} diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index f2517eaf71..9693cd8190 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -3,6 +3,7 @@ mod deps_resolve; pub mod module; use flowy_dispatch::prelude::*; +use flowy_ws::start_ws_connection; use module::build_modules; pub use module::*; use std::sync::{ @@ -87,5 +88,9 @@ fn init_log(config: &FlowySDKConfig) { fn init_dispatch(root: &str) -> EventDispatch { let config = ModuleConfig { root: root.to_owned() }; let dispatch = EventDispatch::construct(|| build_modules(config)); + + dispatch.spawn(async { + start_ws_connection(); + }); dispatch } diff --git a/rust-lib/flowy-ws/Cargo.toml b/rust-lib/flowy-ws/Cargo.toml index d9c245d33d..d58f0f637e 100644 --- a/rust-lib/flowy-ws/Cargo.toml +++ b/rust-lib/flowy-ws/Cargo.toml @@ -7,13 +7,15 @@ edition = "2018" [dependencies] flowy-derive = { path = "../flowy-derive" } +flowy-net = { path = "../flowy-net" } tokio-tungstenite = "0.15" -futures-util = "0.3.15" -futures-channel = "0.3.15" -tokio = {version = "1.6.0", features = ["full"]} -futures = "0.3.15" +futures-util = "0.3.17" +futures-channel = "0.3.17" +tokio = {version = "1", features = ["full"]} +futures = "0.3.17" lazy_static = "1.4" +parking_lot = "0.11" bytes = "0.5" pin-project = "1.0.0" futures-core = { version = "0.3", default-features = false } @@ -22,4 +24,7 @@ url = "2.2.2" log = "0.4" protobuf = {version = "2.18.0"} strum = "0.21" -strum_macros = "0.21" \ No newline at end of file +strum_macros = "0.21" + +[dev-dependencies] +tokio = {version = "1", features = ["full"]} \ No newline at end of file diff --git a/rust-lib/flowy-ws/src/lib.rs b/rust-lib/flowy-ws/src/lib.rs index 15ba12e1a6..40e2b632dd 100644 --- a/rust-lib/flowy-ws/src/lib.rs +++ b/rust-lib/flowy-ws/src/lib.rs @@ -1,3 +1,5 @@ pub mod errors; pub mod protobuf; -pub mod ws; +mod ws; + +pub use ws::*; diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index e10d015541..14daeea857 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -1,7 +1,9 @@ use crate::errors::WsError; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; -use futures_core::{ready, Stream}; -use futures_util::StreamExt; +use futures_core::{future::BoxFuture, ready, Stream}; +use futures_util::{pin_mut, FutureExt, StreamExt}; +use lazy_static::lazy_static; +use parking_lot::RwLock; use pin_project::pin_project; use std::{ future::Future, @@ -10,30 +12,48 @@ use std::{ task::{Context, Poll}, }; use tokio::net::TcpStream; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{handshake::client::Response, Error, Message}, + MaybeTlsStream, + WebSocketStream, +}; +lazy_static! { + pub static ref WS: RwLock = RwLock::new(WsController::new()); +} + +pub fn start_ws_connection() { WS.write().connect(flowy_net::config::WS_ADDR.as_ref()); } + pub type MsgReceiver = UnboundedReceiver; pub type MsgSender = UnboundedSender; - pub trait WsMessageHandler: Sync + Send + 'static { fn handler_message(&self, msg: &Message); } pub struct WsController { - connection: Option>, + sender: Option>, handlers: Vec>, } impl WsController { pub fn new() -> Self { - Self { - connection: None, + let controller = Self { + sender: None, handlers: vec![], - } + }; + + controller } pub fn add_handlers(&mut self, handler: Arc) { self.handlers.push(handler); } - pub async fn connect(&mut self, addr: &str) -> Result<(), WsError> { + pub fn connect(&mut self, addr: &str) { + let (ws, handlers) = self.make_connect(&addr); + let _ = tokio::spawn(ws); + let _ = tokio::spawn(handlers); + } + + fn make_connect(&mut self, addr: &str) -> (WsRaw, WsHandlers) { // Stream User // ┌───────────────┐ ┌──────────────┐ // ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │ @@ -44,38 +64,19 @@ impl WsController { // └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │ // │ └─────────┘ │ └────────┘ │ └────────┘ │ // └───────────────┘ └──────────────┘ - + let addr = addr.to_string(); let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded(); let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded(); - let mut ws_raw = WsRaw::new(msg_tx, ws_rx); - let connection = Arc::new(WsConnect::new(ws_tx)); + let sender = Arc::new(WsSender::new(ws_tx)); + let handlers = self.handlers.clone(); + self.sender = Some(sender.clone()); + log::debug!("🐴ws prepare connection"); - self.connection = Some(connection.clone()); - - let start_connect = { ws_raw.connect(&addr) }; - let spawn_handlers = SpawnHandlers::new(self.handlers.clone(), msg_rx); - // let spawn_handlers = { - // msg_rx.for_each(|message| async move { - // let handlers: Arc>> = Arc::new(vec![]); - // handlers.iter().for_each(|handler| { - // handler.handler_message(&message); - // }); - // }) - // }; - tokio::select! { - _ = spawn_handlers => { - log::debug!("Websocket read completed") - } - _ = start_connect => { - log::debug!("Connection completed") - } - }; - - Ok(()) + (WsRaw::new(msg_tx, ws_rx, addr), WsHandlers::new(handlers, msg_rx)) } pub fn send_message(&self, msg: Message) -> Result<(), WsError> { - match &self.connection { + match &self.sender { None => panic!(), Some(conn) => conn.send(msg), } @@ -83,19 +84,18 @@ impl WsController { } #[pin_project] -struct SpawnHandlers { +struct WsHandlers { #[pin] msg_rx: MsgReceiver, handlers: Vec>, } -impl SpawnHandlers { +impl WsHandlers { fn new(handlers: Vec>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } } -impl Future for SpawnHandlers { +impl Future for WsHandlers { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match ready!(self.as_mut().project().msg_rx.poll_next(cx)) { @@ -108,56 +108,169 @@ impl Future for SpawnHandlers { } } -pub struct WsConnect { +#[pin_project] +pub struct WsRaw { + msg_tx: Option, + ws_rx: Option, + #[pin] + fut: BoxFuture<'static, Result<(WebSocketStream>, Response), Error>>, +} + +impl WsRaw { + pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self { + WsRaw { + msg_tx: Some(msg_tx), + ws_rx: Some(ws_rx), + fut: Box::pin(async move { connect_async(&addr).await }), + } + } +} + +impl Future for WsRaw { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // [[pin]] + // poll async function. The following methods not work. + // 1. + // let f = connect_async(""); + // pin_mut!(f); + // ready!(Pin::new(&mut a).poll(cx)) + // + // 2.ready!(Pin::new(&mut Box::pin(connect_async(""))).poll(cx)) + // + // An async method calls poll multiple times and might return to the executor. A + // single poll call can only return to the executor once and will get + // resumed through another poll invocation. the connect_async call multiple time + // from the beginning. So I use fut to hold the future and continue to + // poll it. (Fix me if i was wrong) + + loop { + return match ready!(self.as_mut().project().fut.poll(cx)) { + Ok((stream, _)) => { + log::debug!("🐴 ws connect success"); + let mut ws_stream = WsStream { + msg_tx: self.msg_tx.take(), + ws_rx: self.ws_rx.take(), + stream: Some(stream), + }; + match Pin::new(&mut ws_stream).poll(cx) { + Poll::Ready(_a) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + }, + Err(e) => { + log::error!("🐴 ws connect failed: {:?}", e); + Poll::Ready(()) + }, + }; + } + } +} + +#[pin_project] +struct WsConn { + #[pin] + fut: BoxFuture<'static, Result<(WebSocketStream>, Response), Error>>, +} + +impl WsConn { + fn new(addr: String) -> Self { + Self { + fut: Box::pin(async move { connect_async(&addr).await }), + } + } +} + +impl Future for WsConn { + type Output = Result<(WebSocketStream>, Response), Error>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + return match ready!(self.as_mut().project().fut.poll(cx)) { + Ok(o) => Poll::Ready(Ok(o)), + Err(e) => Poll::Ready(Err(e)), + }; + } + } +} + +struct WsStream { + msg_tx: Option, + ws_rx: Option, + stream: Option>>, +} + +impl Future for WsStream { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (tx, rx) = (self.msg_tx.take().unwrap(), self.ws_rx.take().unwrap()); + let (ws_write, ws_read) = self.stream.take().unwrap().split(); + let to_ws = rx.map(Ok).forward(ws_write); + let from_ws = ws_read.for_each(|message| async { + match message { + Ok(message) => { + match tx.unbounded_send(message) { + Ok(_) => {}, + Err(e) => log::error!("tx send error: {:?}", e), + }; + }, + Err(e) => log::error!("ws read error: {:?}", e), + } + }); + + pin_mut!(to_ws, from_ws); + log::debug!("🐴 ws start poll stream"); + match to_ws.poll_unpin(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => match from_ws.poll_unpin(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + }, + } + } +} + +pub struct WsSender { ws_tx: MsgSender, } -impl WsConnect { +impl WsSender { pub fn new(ws_tx: MsgSender) -> Self { Self { ws_tx } } + pub fn send(&self, msg: Message) -> Result<(), WsError> { let _ = self.ws_tx.unbounded_send(msg)?; Ok(()) } } -pub struct WsRaw { - msg_tx: Option, - ws_rx: Option, -} +#[cfg(test)] +mod tests { + use super::WsController; + use futures_util::{pin_mut, StreamExt}; + use tokio_tungstenite::connect_async; -impl WsRaw { - pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver) -> Self { - WsRaw { - msg_tx: Some(msg_tx), - ws_rx: Some(ws_rx), - } - } - - pub async fn connect(&mut self, addr: &str) -> Result<(), WsError> { - let url = url::Url::parse(addr)?; - match connect_async(url).await { - Ok((stream, _)) => self.bind_stream(stream).await, - Err(e) => Err(WsError::internal().context(e)), - } - } - - async fn bind_stream(&mut self, stream: WebSocketStream>) -> Result<(), WsError> { - let (ws_write, ws_read) = stream.split(); - let (tx, rx) = self.take_mpsc(); - let to_ws = rx.map(Ok).forward(ws_write); - let from_ws = { - ws_read.for_each(|message| async { - match message { - Ok(message) => { - match tx.unbounded_send(message) { - Ok(_) => {}, - Err(e) => log::error!("tx send error: {:?}", e), - }; - }, - Err(e) => log::error!("ws read error: {:?}", e), - } - }) + #[tokio::test] + async fn connect() { + let mut controller = WsController::new(); + let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str()); + let (a, b) = controller.make_connect(&addr); + tokio::select! { + _ = a => println!("write completed"), + _ = b => println!("read completed"), }; + } + + #[tokio::test] + async fn connect_raw() { + let _controller = WsController::new(); + let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str()); + let (tx, rx) = futures_channel::mpsc::unbounded(); + let (ws_write, ws_read) = connect_async(&addr).await.unwrap().0.split(); + let to_ws = rx.map(Ok).forward(ws_write); + let from_ws = ws_read.for_each(|message| async { + tx.unbounded_send(message.unwrap()).unwrap(); + }); + + pin_mut!(to_ws, from_ws); tokio::select! { _ = to_ws => { log::debug!("ws write completed") @@ -166,8 +279,5 @@ impl WsRaw { log::debug!("ws read completed") } }; - Ok(()) } - - fn take_mpsc(&mut self) -> (MsgSender, MsgReceiver) { (self.msg_tx.take().unwrap(), self.ws_rx.take().unwrap()) } }