init server with dispatcher's runtime

This commit is contained in:
appflowy 2022-01-07 23:00:23 +08:00
parent 287698be9e
commit 1e0cef41a9
15 changed files with 102 additions and 65 deletions

View File

@ -77,7 +77,7 @@ impl DocumentWebSocketActor {
.map_err(internal_error)??;
tracing::debug!(
"[DocumentWebSocketActor]: client data: {}:{}, {:?}",
"[DocumentWebSocketActor]: receive: {}:{}, {:?}",
document_client_data.doc_id,
document_client_data.id,
document_client_data.ty

View File

@ -26,7 +26,7 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
let path: &str = c_str.to_str().unwrap();
let server_config = get_client_server_configuration().unwrap();
let config = FlowySDKConfig::new(path, server_config, "appflowy", None).log_filter("debug");
let config = FlowySDKConfig::new(path, server_config, "appflowy").log_filter("debug");
*FLOWY_SDK.write() = Some(Arc::new(FlowySDK::new(config)));
0

View File

@ -272,8 +272,8 @@ impl DocumentWSSink {
Ok(())
},
Some(data) => {
tracing::debug!("[DocumentSink]: Try send: {}:{:?}-{}", data.doc_id, data.ty, data.id());
self.ws_sender.send(data).map_err(internal_error)
tracing::debug!("[DocumentSink]: send: {}:{}-{:?}", data.doc_id, data.id(), data.ty);
self.ws_sender.send(data)
// let _ = tokio::time::timeout(Duration::from_millis(2000),
},
}

View File

@ -9,7 +9,9 @@ use std::sync::Arc;
use tokio::sync::broadcast::Receiver;
impl FlowyRawWebSocket for Arc<WSController> {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
fn initialize(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn start_connect(&self, addr: String, _user_id: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.start(addr).await.map_err(internal_error)?;

View File

@ -15,27 +15,13 @@ use flowy_collaboration::{
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use lib_ws::{WSModule, WebSocketRawMessage};
use std::{
convert::TryInto,
fmt::{Debug, Formatter},
sync::Arc,
};
use tokio::sync::mpsc;
pub(crate) fn spawn_server(receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>) -> Arc<LocalDocumentServer> {
let (server_tx, mut server_rx) = mpsc::unbounded_channel();
let server = Arc::new(LocalDocumentServer::new(server_tx));
tokio::spawn(async move {
while let Some(message) = server_rx.recv().await {
match receivers.get(&message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", message),
Some(handler) => handler.receive_message(message.clone()),
}
}
});
server
}
use tokio::sync::{mpsc, mpsc::UnboundedSender};
pub struct LocalDocumentServer {
pub doc_manager: Arc<ServerDocumentManager>,
@ -49,14 +35,19 @@ impl LocalDocumentServer {
LocalDocumentServer { doc_manager, sender }
}
pub async fn handle_client_data(&self, client_data: DocumentClientWSData) -> Result<(), CollaborateError> {
pub async fn handle_client_data(
&self,
client_data: DocumentClientWSData,
user_id: String,
) -> Result<(), CollaborateError> {
tracing::debug!(
"[LocalDocumentServer] receive client data: {}:{:?} ",
"[LocalDocumentServer] receive: {}:{}-{:?} ",
client_data.doc_id,
client_data.ty
client_data.id(),
client_data.ty,
);
let user = Arc::new(LocalDocumentUser {
user_id: "fake_user_id".to_owned(),
user_id,
ws_sender: self.sender.clone(),
});
let ty = client_data.ty.clone();
@ -146,6 +137,13 @@ impl RevisionUser for LocalDocumentUser {
fn receive(&self, resp: SyncResponse) {
let sender = self.ws_sender.clone();
let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
Ok(_) => {},
Err(e) => {
tracing::error!("LocalDocumentUser send message failed: {}", e);
},
};
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
@ -154,7 +152,7 @@ impl RevisionUser for LocalDocumentUser {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).unwrap();
send_fn(sender, msg);
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
@ -162,7 +160,7 @@ impl RevisionUser for LocalDocumentUser {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).unwrap();
send_fn(sender, msg);
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
@ -170,9 +168,9 @@ impl RevisionUser for LocalDocumentUser {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).unwrap();
send_fn(sender, msg);
},
SyncResponse::NewRevision(_) => {
SyncResponse::NewRevision(mut _repeated_revision) => {
// unimplemented!()
},
}

View File

@ -6,17 +6,20 @@ use lib_infra::future::FutureResult;
use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage};
use crate::services::{
local_ws::local_server::{spawn_server, LocalDocumentServer},
local_ws::local_server::LocalDocumentServer,
ws_conn::{FlowyRawWebSocket, FlowyWSSender},
};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver};
use tokio::sync::{broadcast, broadcast::Receiver, mpsc, mpsc::UnboundedReceiver};
pub struct LocalWebSocket {
receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: LocalWSSender,
server: Arc<LocalDocumentServer>,
server_rx: RwLock<Option<UnboundedReceiver<WebSocketRawMessage>>>,
user_id: Arc<RwLock<Option<String>>>,
}
impl std::default::Default for LocalWebSocket {
@ -24,13 +27,19 @@ impl std::default::Default for LocalWebSocket {
let (state_sender, _) = broadcast::channel(16);
let ws_sender = LocalWSSender::default();
let receivers = Arc::new(DashMap::new());
let server = spawn_server(receivers.clone());
let (server_tx, server_rx) = mpsc::unbounded_channel();
let server = Arc::new(LocalDocumentServer::new(server_tx));
let server_rx = RwLock::new(Some(server_rx));
let user_token = Arc::new(RwLock::new(None));
LocalWebSocket {
receivers,
state_sender,
ws_sender,
server,
server_rx,
user_id: user_token,
}
}
}
@ -38,15 +47,26 @@ impl std::default::Default for LocalWebSocket {
impl LocalWebSocket {
fn spawn_client(&self, _addr: String) {
let mut ws_receiver = self.ws_sender.subscribe();
let server = self.server.clone();
let local_server = self.server.clone();
let user_id = self.user_id.clone();
tokio::spawn(async move {
loop {
// Polling the web socket message sent by user
match ws_receiver.recv().await {
Ok(message) => {
let fut = || async {
let user_id = user_id.read().clone();
if user_id.is_none() {
continue;
}
let user_id = user_id.unwrap();
let server = local_server.clone();
let fut = || async move {
let bytes = Bytes::from(message.data);
let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?;
let _ = server.handle_client_data(client_data).await?;
let _ = server
.handle_client_data(client_data, user_id)
.await
.map_err(internal_error)?;
Ok::<(), FlowyError>(())
};
match fut().await {
@ -62,7 +82,22 @@ impl LocalWebSocket {
}
impl FlowyRawWebSocket for LocalWebSocket {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
fn initialize(&self) -> FutureResult<(), FlowyError> {
let mut server_rx = self.server_rx.write().take().expect("Only take once");
let receivers = self.receivers.clone();
tokio::spawn(async move {
while let Some(message) = server_rx.recv().await {
match receivers.get(&message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", message),
Some(handler) => handler.receive_message(message.clone()),
}
}
});
FutureResult::new(async { Ok(()) })
}
fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError> {
*self.user_id.write() = Some(user_id);
self.spawn_client(addr);
FutureResult::new(async { Ok(()) })
}

View File

@ -9,7 +9,8 @@ use std::sync::Arc;
use tokio::sync::broadcast;
pub trait FlowyRawWebSocket: Send + Sync {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>;
fn initialize(&self) -> FutureResult<(), FlowyError>;
fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError>;
fn stop_connect(&self) -> FutureResult<(), FlowyError>;
fn subscribe_connect_state(&self) -> broadcast::Receiver<WSConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
@ -39,10 +40,17 @@ impl FlowyWebSocketConnect {
}
}
pub async fn start(&self, token: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, token);
pub async fn init(&self) {
match self.inner.initialize().await {
Ok(_) => {},
Err(e) => tracing::error!("FlowyWebSocketConnect init error: {:?}", e),
}
}
pub async fn start(&self, token: String, user_id: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, &token);
self.inner.stop_connect().await?;
let _ = self.inner.start_connect(addr).await?;
let _ = self.inner.start_connect(addr, user_id).await?;
Ok(())
}

View File

@ -6,7 +6,7 @@ use flowy_document::{
core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
errors::{internal_error, FlowyError},
};
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use flowy_net::services::ws_conn::{FlowyWSSender, FlowyWebSocketConnect};
use flowy_user::services::user::UserSession;
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc};
@ -71,7 +71,7 @@ impl DocumentWebSocket for DocumentWebSocketAdapter {
module: WSModule::Doc,
data: bytes.to_vec(),
};
let sender = self.ws_conn.ws_sender().map_err(internal_error)?;
let sender = self.ws_conn.ws_sender()?;
sender.send(msg).map_err(internal_error)?;
Ok(())
}

View File

@ -36,7 +36,6 @@ pub struct FlowySDKConfig {
root: String,
log_filter: String,
server_config: ClientServerConfiguration,
ws: Arc<dyn FlowyRawWebSocket>,
}
impl fmt::Debug for FlowySDKConfig {
@ -50,19 +49,12 @@ impl fmt::Debug for FlowySDKConfig {
}
impl FlowySDKConfig {
pub fn new(
root: &str,
server_config: ClientServerConfiguration,
name: &str,
ws: Option<Arc<dyn FlowyRawWebSocket>>,
) -> Self {
let ws = ws.unwrap_or_else(default_web_socket);
pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self {
FlowySDKConfig {
name: name.to_owned(),
root: root.to_owned(),
log_filter: crate_log_filter(None),
server_config,
ws,
}
}
@ -107,7 +99,7 @@ impl FlowySDK {
let ws_conn = Arc::new(FlowyWebSocketConnect::new(
config.server_config.ws_addr(),
config.ws.clone(),
default_web_socket(),
));
let user_session = mk_user_session(&config);
let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config);
@ -146,6 +138,7 @@ fn _init(
dispatch.spawn(async move {
user_session.init();
ws_conn.init().await;
listen_on_websocket(ws_conn.clone());
_listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await;
});
@ -163,9 +156,9 @@ async fn _listen_user_status(
while let Ok(status) = subscribe.recv().await {
let result = || async {
match status {
UserStatus::Login { token } => {
UserStatus::Login { token, user_id } => {
let _ = core.user_did_sign_in(&token).await?;
let _ = ws_conn.start(token).await?;
let _ = ws_conn.start(token, user_id).await?;
},
UserStatus::Logout { .. } => {
core.user_did_logout().await;
@ -177,7 +170,7 @@ async fn _listen_user_status(
},
UserStatus::SignUp { profile, ret } => {
let _ = core.user_did_sign_up(&profile.token).await?;
let _ = ws_conn.start(profile.token.clone()).await?;
let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
let _ = ret.send(());
},
}

View File

@ -38,7 +38,7 @@ impl std::default::Default for FlowySDKTest {
impl FlowySDKTest {
pub fn new(server_config: ClientServerConfiguration, ws: Option<Arc<dyn FlowyRawWebSocket>>) -> Self {
let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string(), None).log_filter("debug");
let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string()).log_filter("debug");
let sdk = FlowySDK::new(config);
std::mem::forget(sdk.dispatcher());
Self { inner: sdk, ws }

View File

@ -5,6 +5,7 @@ use tokio::sync::mpsc;
pub enum UserStatus {
Login {
token: String,
user_id: String,
},
Logout {
token: String,

View File

@ -16,9 +16,10 @@ impl std::default::Default for UserNotifier {
impl UserNotifier {
pub(crate) fn new() -> Self { UserNotifier::default() }
pub(crate) fn notify_login(&self, token: &str) {
pub(crate) fn notify_login(&self, token: &str, user_id: &str) {
let _ = self.user_status_notifier.send(UserStatus::Login {
token: token.to_owned(),
user_id: user_id.to_owned(),
});
}

View File

@ -67,7 +67,7 @@ impl UserSession {
pub fn init(&self) {
if let Ok(session) = self.get_session() {
self.notifier.notify_login(&session.token);
self.notifier.notify_login(&session.token, &session.user_id);
}
}
@ -97,7 +97,7 @@ impl UserSession {
let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?;
let user_profile: UserProfile = user_table.into();
self.notifier.notify_login(&user_profile.token);
self.notifier.notify_login(&user_profile.token, &user_profile.id);
Ok(user_profile)
}
}

View File

@ -206,7 +206,6 @@ impl OpenDocHandle {
result
}
#[tracing::instrument(level = "debug", skip(self, user), err)]
async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone());

View File

@ -122,17 +122,17 @@ impl RevisionSynchronizer {
let server_rev_id = self.rev_id();
tracing::Span::current().record("server_rev_id", &server_rev_id);
match server_rev_id.cmp(&client_rev_id) {
Ordering::Less => tracing::error!(
"[Pong] Client should not send ping and the server should pull the revisions from the client"
),
Ordering::Equal => tracing::debug!("[Pong]: The document:{} is up to date.", doc_id),
Ordering::Less => {
tracing::error!("Client should not send ping and the server should pull the revisions from the client")
},
Ordering::Equal => tracing::debug!("{} is up to date.", doc_id),
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = client_rev_id;
let to_rev_id = server_rev_id;
tracing::trace!("[Pong]: Push revisions to user");
tracing::trace!("Push revisions to user");
let _ = self
.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id)
.await;