using CacheActor method to handle the ws message one by one

This commit is contained in:
appflowy
2021-09-29 17:40:34 +08:00
parent 83acc79cd1
commit 1fdef0914b
44 changed files with 658 additions and 548 deletions

View File

@ -58,6 +58,7 @@ md5 = "0.7.0"
futures-core = { version = "0.3", default-features = false }
pin-project = "1.0.0"
byteorder = {version = "1.3.4"}
async-stream = "0.3.2"
flowy-user = { path = "../rust-lib/flowy-user" }
flowy-workspace = { path = "../rust-lib/flowy-workspace" }
@ -93,10 +94,12 @@ path = "src/main.rs"
parking_lot = "0.11"
once_cell = "1.7.2"
linkify = "0.5.0"
flowy-user = { path = "../rust-lib/flowy-user" }
flowy-workspace = { path = "../rust-lib/flowy-workspace" }
flowy-user = { path = "../rust-lib/flowy-user", features = ["http_server"] }
flowy-workspace = { path = "../rust-lib/flowy-workspace", features = ["http_server"] }
flowy-ws = { path = "../rust-lib/flowy-ws" }
flowy-sdk = { path = "../rust-lib/flowy-sdk" }
flowy-test = { path = "../rust-lib/flowy-test" }
flowy-infra = { path = "../rust-lib/flowy-infra" }
flowy-ot = { path = "../rust-lib/flowy-ot" }
flowy-ot = { path = "../rust-lib/flowy-ot" }
flowy-document = { path = "../rust-lib/flowy-document", features = ["flowy_test", "http_server"] }
flowy-sqlite = { path = "../rust-lib/flowy-sqlite" }

View File

@ -16,7 +16,6 @@ use crate::{
service::{
app::router as app,
doc::router as doc,
make_ws_biz_handlers,
user::router as user,
view::router as view,
workspace::router as workspace,
@ -31,11 +30,10 @@ pub struct Application {
}
impl Application {
pub async fn build(configuration: Settings) -> Result<Self, std::io::Error> {
pub async fn build(configuration: Settings, app_ctx: AppContext) -> Result<Self, std::io::Error> {
let address = format!("{}:{}", configuration.application.host, configuration.application.port);
let listener = TcpListener::bind(&address)?;
let port = listener.local_addr().unwrap().port();
let app_ctx = init_app_context(&configuration).await;
let server = run(listener, app_ctx)?;
Ok(Self { port, server })
}
@ -46,13 +44,9 @@ impl Application {
}
pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result<Server, std::io::Error> {
let AppContext { ws_server, pg_pool } = app_ctx;
let ws_server = Data::new(ws_server);
let pg_pool = Data::new(pg_pool);
let domain = domain();
let secret: String = secret();
let ws_biz_handlers = Data::new(make_ws_biz_handlers(pg_pool.clone()));
actix_rt::spawn(period_check(pg_pool.clone()));
actix_rt::spawn(period_check(app_ctx.pg_pool.clone()));
let server = HttpServer::new(move || {
App::new()
@ -63,9 +57,11 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result<Server, std::io
.app_data(web::JsonConfig::default().limit(4096))
.service(ws_scope())
.service(user_scope())
.app_data(ws_server.clone())
.app_data(pg_pool.clone())
.app_data(ws_biz_handlers.clone())
.app_data(app_ctx.ws_server.clone())
.app_data(app_ctx.pg_pool.clone())
.app_data(app_ctx.ws_bizs.clone())
.app_data(app_ctx.doc_biz.clone())
.app_data(app_ctx.runtime.clone())
})
.listen(listener)?
.run();
@ -129,8 +125,10 @@ fn user_scope() -> Scope {
)
}
async fn init_app_context(configuration: &Settings) -> AppContext {
let _ = crate::service::log::Builder::new("flowy").env_filter("Trace").build();
pub async fn init_app_context(configuration: &Settings) -> AppContext {
let _ = crate::service::log::Builder::new("flowy-server")
.env_filter("Trace")
.build();
let pg_pool = get_connection_pool(&configuration.database).await.expect(&format!(
"Failed to connect to Postgres at {:?}.",
configuration.database

View File

@ -1,18 +1,48 @@
use crate::service::ws::WsServer;
use crate::service::{
doc::doc::DocBiz,
ws::{WsBizHandlers, WsServer},
};
use actix::Addr;
use actix_web::web::Data;
use flowy_ws::WsModule;
use sqlx::PgPool;
use std::{io, sync::Arc};
pub type FlowyRuntime = tokio::runtime::Runtime;
#[derive(Clone)]
pub struct AppContext {
pub ws_server: Addr<WsServer>,
pub pg_pool: PgPool,
pub ws_server: Data<Addr<WsServer>>,
pub pg_pool: Data<PgPool>,
pub ws_bizs: Data<WsBizHandlers>,
pub doc_biz: Data<Arc<DocBiz>>,
pub runtime: Data<FlowyRuntime>,
}
impl AppContext {
pub fn new(ws_server: Addr<WsServer>, db_pool: PgPool) -> Self {
let ws_server = Data::new(ws_server);
let pg_pool = Data::new(db_pool);
let runtime = Data::new(runtime().unwrap());
let mut ws_bizs = WsBizHandlers::new();
let doc_biz = Arc::new(DocBiz::new(pg_pool.clone()));
ws_bizs.register(WsModule::Doc, doc_biz.clone());
AppContext {
ws_server,
pg_pool: db_pool,
pg_pool,
ws_bizs: Data::new(ws_bizs),
doc_biz: Data::new(doc_biz),
runtime,
}
}
}
fn runtime() -> io::Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_multi_thread()
.thread_name("flowy-server-rt")
.enable_io()
.enable_time()
.build()
}

View File

@ -1,7 +1,7 @@
pub mod application;
pub mod config;
mod context;
pub mod context;
mod entities;
mod middleware;
mod service;
pub mod service;
mod sqlx_ext;

View File

@ -1,9 +1,13 @@
use backend::{application::Application, config::get_configuration};
use backend::{
application::{init_app_context, Application},
config::get_configuration,
};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let configuration = get_configuration().expect("Failed to read configuration.");
let application = Application::build(configuration).await?;
let app_ctx = init_app_context(&configuration).await;
let application = Application::build(configuration, app_ctx).await?;
application.run_until_stopped().await?;
Ok(())

View File

@ -0,0 +1,121 @@
use crate::{
entities::doc::{DocTable, DOC_TABLE},
sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
};
use anyhow::Context;
use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams};
use flowy_net::errors::ServerError;
use sqlx::{postgres::PgArguments, PgPool, Postgres};
use uuid::Uuid;
#[tracing::instrument(level = "debug", skip(transaction), err)]
pub(crate) async fn create_doc(
transaction: &mut DBTransaction<'_>,
params: CreateDocParams,
) -> Result<(), ServerError> {
let uuid = Uuid::parse_str(&params.id)?;
let (sql, args) = NewDocSqlBuilder::new(uuid).data(params.data).build()?;
let _ = sqlx::query_with(&sql, args)
.execute(transaction)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(pool), err)]
pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result<Doc, ServerError> {
let doc_id = Uuid::parse_str(&params.doc_id)?;
let mut transaction = pool
.begin()
.await
.context("Failed to acquire a Postgres connection to read doc")?;
let builder = SqlBuilder::select(DOC_TABLE).add_field("*").and_where_eq("id", &doc_id);
let (sql, args) = builder.build()?;
// TODO: benchmark the speed of different documents with different size
let doc: Doc = sqlx::query_as_with::<Postgres, DocTable, PgArguments>(&sql, args)
.fetch_one(&mut transaction)
.await
.map_err(map_sqlx_error)?
.into();
transaction
.commit()
.await
.context("Failed to commit SQL transaction to read doc.")?;
Ok(doc)
}
#[tracing::instrument(level = "debug", skip(pool, params), err)]
pub(crate) async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> {
let doc_id = Uuid::parse_str(&params.doc_id)?;
let mut transaction = pool
.begin()
.await
.context("Failed to acquire a Postgres connection to update doc")?;
let data = Some(params.take_data());
let (sql, args) = SqlBuilder::update(DOC_TABLE)
.add_some_arg("data", data)
.add_arg("rev_id", params.rev_id)
.and_where_eq("id", doc_id)
.build()?;
sqlx::query_with(&sql, args)
.execute(&mut transaction)
.await
.map_err(map_sqlx_error)?;
transaction
.commit()
.await
.context("Failed to commit SQL transaction to update doc.")?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(transaction), err)]
pub(crate) async fn delete_doc(transaction: &mut DBTransaction<'_>, doc_id: Uuid) -> Result<(), ServerError> {
let (sql, args) = SqlBuilder::delete(DOC_TABLE).and_where_eq("id", doc_id).build()?;
let _ = sqlx::query_with(&sql, args)
.execute(transaction)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
pub struct NewDocSqlBuilder {
table: DocTable,
}
impl NewDocSqlBuilder {
pub fn new(id: Uuid) -> Self {
let table = DocTable {
id,
data: "".to_owned(),
rev_id: 0,
};
Self { table }
}
pub fn data(mut self, data: String) -> Self {
self.table.data = data;
self
}
pub fn build(self) -> Result<(String, PgArguments), ServerError> {
let (sql, args) = SqlBuilder::create(DOC_TABLE)
.add_arg("id", self.table.id)
.add_arg("data", self.table.data)
.add_arg("rev_id", self.table.rev_id)
.build()?;
Ok((sql, args))
}
}

View File

@ -1,92 +1,89 @@
use super::sql_builder::*;
use crate::{
entities::doc::{DocTable, DOC_TABLE},
sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
use super::edit_doc::EditDocContext;
use crate::service::{
doc::{
read_doc,
ws_actor::{DocWsMsg, DocWsMsgActor},
},
ws::{WsBizHandler, WsClientData},
};
use anyhow::Context;
use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams};
use actix_web::web::Data;
use dashmap::DashMap;
use flowy_document::protobuf::QueryDocParams;
use flowy_net::errors::ServerError;
use sqlx::{postgres::PgArguments, PgPool, Postgres};
use uuid::Uuid;
#[tracing::instrument(level = "debug", skip(transaction), err)]
pub(crate) async fn create_doc(
transaction: &mut DBTransaction<'_>,
params: CreateDocParams,
) -> Result<(), ServerError> {
let uuid = Uuid::parse_str(&params.id)?;
let (sql, args) = NewDocSqlBuilder::new(uuid).data(params.data).build()?;
let _ = sqlx::query_with(&sql, args)
.execute(transaction)
.await
.map_err(map_sqlx_error)?;
use protobuf::Message;
use sqlx::PgPool;
use std::{sync::Arc, time::Duration};
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot};
Ok(())
pub struct DocBiz {
pub manager: Arc<DocManager>,
sender: mpsc::Sender<DocWsMsg>,
pg_pool: Data<PgPool>,
}
#[tracing::instrument(level = "debug", skip(pool), err)]
pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result<Doc, ServerError> {
let doc_id = Uuid::parse_str(&params.doc_id)?;
let mut transaction = pool
.begin()
.await
.context("Failed to acquire a Postgres connection to read doc")?;
let builder = SqlBuilder::select(DOC_TABLE).add_field("*").and_where_eq("id", &doc_id);
let (sql, args) = builder.build()?;
// TODO: benchmark the speed of different documents with different size
let doc: Doc = sqlx::query_as_with::<Postgres, DocTable, PgArguments>(&sql, args)
.fetch_one(&mut transaction)
.await
.map_err(map_sqlx_error)?
.into();
transaction
.commit()
.await
.context("Failed to commit SQL transaction to read doc.")?;
Ok(doc)
impl DocBiz {
pub fn new(pg_pool: Data<PgPool>) -> Self {
let manager = Arc::new(DocManager::new());
let (tx, rx) = mpsc::channel(100);
let actor = DocWsMsgActor::new(rx, manager.clone());
tokio::task::spawn(actor.run());
Self {
manager,
sender: tx,
pg_pool,
}
}
}
#[tracing::instrument(level = "debug", skip(pool, params), err)]
pub(crate) async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> {
let doc_id = Uuid::parse_str(&params.doc_id)?;
let mut transaction = pool
.begin()
.await
.context("Failed to acquire a Postgres connection to update doc")?;
impl WsBizHandler for DocBiz {
fn receive_data(&self, client_data: WsClientData) {
let (ret, rx) = oneshot::channel();
let sender = self.sender.clone();
let pool = self.pg_pool.clone();
let data = Some(params.take_data());
let (sql, args) = SqlBuilder::update(DOC_TABLE)
.add_some_arg("data", data)
.add_arg("rev_id", params.rev_id)
.and_where_eq("id", doc_id)
.build()?;
sqlx::query_with(&sql, args)
.execute(&mut transaction)
.await
.map_err(map_sqlx_error)?;
transaction
.commit()
.await
.context("Failed to commit SQL transaction to update doc.")?;
Ok(())
actix_rt::spawn(async move {
let msg = DocWsMsg::ClientData {
data: client_data,
ret,
pool,
};
match sender.send(msg).await {
Ok(_) => {},
Err(e) => log::error!("{}", e),
}
match rx.await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
};
});
}
}
#[tracing::instrument(level = "debug", skip(transaction), err)]
pub(crate) async fn delete_doc(transaction: &mut DBTransaction<'_>, doc_id: Uuid) -> Result<(), ServerError> {
let (sql, args) = SqlBuilder::delete(DOC_TABLE).and_where_eq("id", doc_id).build()?;
let _ = sqlx::query_with(&sql, args)
.execute(transaction)
.await
.map_err(map_sqlx_error)?;
Ok(())
pub struct DocManager {
docs_map: DashMap<String, Arc<EditDocContext>>,
}
impl DocManager {
pub fn new() -> Self {
Self {
docs_map: DashMap::new(),
}
}
pub async fn get(&self, doc_id: &str, pg_pool: Data<PgPool>) -> Result<Option<Arc<EditDocContext>>, ServerError> {
match self.docs_map.get(doc_id) {
None => {
let params = QueryDocParams {
doc_id: doc_id.to_string(),
..Default::default()
};
let doc = read_doc(pg_pool.get_ref(), params).await?;
let edit_doc = Arc::new(EditDocContext::new(doc)?);
self.docs_map.insert(doc_id.to_string(), edit_doc.clone());
Ok(Some(edit_doc))
},
Some(ctx) => Ok(Some(ctx.clone())),
}
}
}

View File

View File

@ -1,9 +1,8 @@
use crate::service::{
doc::update_doc,
util::md5,
ws::{entities::Socket, WsClientData, WsMessageAdaptor, WsUser},
};
use actix_web::web::Data;
use byteorder::{BigEndian, WriteBytesExt};
use bytes::Bytes;
use dashmap::DashMap;
@ -20,7 +19,7 @@ use flowy_ot::{
use flowy_ws::WsMessage;
use parking_lot::RwLock;
use protobuf::Message;
use sqlx::PgPool;
use std::{
convert::TryInto,
sync::{
@ -35,16 +34,15 @@ struct EditUser {
socket: Socket,
}
pub(crate) struct EditDocContext {
pub struct EditDocContext {
doc_id: String,
rev_id: AtomicI64,
document: Arc<RwLock<Document>>,
pg_pool: Data<PgPool>,
users: DashMap<String, EditUser>,
}
impl EditDocContext {
pub(crate) fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
pub fn new(doc: Doc) -> Result<Self, ServerError> {
let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
let users = DashMap::new();
@ -52,17 +50,14 @@ impl EditDocContext {
doc_id: doc.id.clone(),
rev_id: AtomicI64::new(doc.rev_id),
document,
pg_pool,
users,
})
}
pub fn doc_json(&self) -> String { self.document.read().to_json() }
#[tracing::instrument(level = "debug", skip(self, client_data, revision))]
pub(crate) async fn apply_revision(
&self,
client_data: WsClientData,
revision: Revision,
) -> Result<(), ServerError> {
pub async fn apply_revision(&self, client_data: WsClientData, revision: Revision) -> Result<(), ServerError> {
let _ = self.verify_md5(&revision)?;
// Opti: find out another way to keep the user socket available.
let user = EditUser {
@ -85,7 +80,7 @@ impl EditDocContext {
// send the prime delta to the client. Client should compose the this prime
// delta.
let (cli_prime, server_prime) = self.transform(&revision.delta).map_err(internal_error)?;
let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
let _ = self.update_document_delta(server_prime)?;
log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
@ -101,7 +96,7 @@ impl EditDocContext {
.do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
.map_err(internal_error)?;
} else {
let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?;
let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
let _ = self.update_document_delta(delta)?;
cli_socket
.do_send(mk_acked_ws_message(&revision))
@ -118,12 +113,12 @@ impl EditDocContext {
}
fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
let delta_data = delta.into_bytes();
let delta_data = delta.to_bytes().to_vec();
let md5 = md5(&delta_data);
let revision = Revision {
base_rev_id,
rev_id: self.rev_id.load(SeqCst),
delta: delta_data,
delta_data,
md5,
doc_id: self.doc_id.to_string(),
ty: RevType::Remote,
@ -160,7 +155,7 @@ impl EditDocContext {
}
fn verify_md5(&self, revision: &Revision) -> Result<(), ServerError> {
if md5(&revision.delta) != revision.md5 {
if md5(&revision.delta_data) != revision.md5 {
return Err(ServerError::internal().context("Delta md5 not match"));
}
Ok(())
@ -173,7 +168,7 @@ impl EditDocContext {
params.set_doc_id(self.doc_id.clone());
params.set_data(self.document.read().to_json());
params.set_rev_id(revision.rev_id);
let _ = update_doc(self.pg_pool.get_ref(), params).await?;
// let _ = update_doc(self.pg_pool.get_ref(), params).await?;
Ok(())
}
}

View File

@ -1,8 +1,8 @@
mod doc;
mod edit_doc_context;
pub mod crud;
pub mod doc;
pub mod edit_doc;
pub mod router;
mod sql_builder;
pub mod ws_handler;
mod ws_actor;
pub(crate) use doc::*;
pub(crate) use crud::*;
pub use router::*;

View File

@ -1,39 +0,0 @@
use crate::{
entities::doc::{DocTable, DOC_TABLE},
sqlx_ext::SqlBuilder,
};
use flowy_net::errors::ServerError;
use sqlx::postgres::PgArguments;
use uuid::Uuid;
pub struct NewDocSqlBuilder {
table: DocTable,
}
impl NewDocSqlBuilder {
pub fn new(id: Uuid) -> Self {
let table = DocTable {
id,
data: "".to_owned(),
rev_id: 0,
};
Self { table }
}
pub fn data(mut self, data: String) -> Self {
self.table.data = data;
self
}
pub fn build(self) -> Result<(String, PgArguments), ServerError> {
let (sql, args) = SqlBuilder::create(DOC_TABLE)
.add_arg("id", self.table.id)
.add_arg("data", self.table.data)
.add_arg("rev_id", self.table.rev_id)
.build()?;
Ok((sql, args))
}
}

View File

@ -0,0 +1,101 @@
use crate::service::{doc::doc::DocManager, util::parse_from_bytes, ws::WsClientData};
use actix_rt::task::spawn_blocking;
use actix_web::web::Data;
use async_stream::stream;
use flowy_document::protobuf::{Revision, WsDataType, WsDocumentData};
use flowy_net::errors::{internal_error, Result as DocResult};
use futures::stream::StreamExt;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub enum DocWsMsg {
ClientData {
data: WsClientData,
pool: Data<PgPool>,
ret: oneshot::Sender<DocResult<()>>,
},
}
pub struct DocWsMsgActor {
receiver: Option<mpsc::Receiver<DocWsMsg>>,
doc_manager: Arc<DocManager>,
}
impl DocWsMsgActor {
pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<DocManager>) -> Self {
Self {
receiver: Some(receiver),
doc_manager: manager,
}
}
pub async fn run(mut self) {
let mut receiver = self
.receiver
.take()
.expect("DocActor's receiver should only take one time");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each(|msg| self.handle_message(msg)).await;
}
async fn handle_message(&self, msg: DocWsMsg) {
match msg {
DocWsMsg::ClientData { data, pool, ret } => {
ret.send(self.handle_client_data(data, pool).await);
},
}
}
async fn handle_client_data(&self, data: WsClientData, pool: Data<PgPool>) -> DocResult<()> {
let bytes = data.data.clone();
let document_data = spawn_blocking(move || {
let document_data: WsDocumentData = parse_from_bytes(&bytes)?;
DocResult::Ok(document_data)
})
.await
.map_err(internal_error)??;
match document_data.ty {
WsDataType::Acked => {},
WsDataType::PushRev => self.handle_push_rev(data, document_data.data, pool).await?,
WsDataType::PullRev => {},
WsDataType::Conflict => {},
}
Ok(())
}
async fn handle_push_rev(
&self,
client_data: WsClientData,
revision_data: Vec<u8>,
pool: Data<PgPool>,
) -> DocResult<()> {
let revision = spawn_blocking(move || {
let revision: Revision = parse_from_bytes(&revision_data)?;
DocResult::Ok(revision)
})
.await
.map_err(internal_error)??;
match self.doc_manager.get(&revision.doc_id, pool).await? {
Some(ctx) => {
ctx.apply_revision(client_data, revision).await;
Ok(())
},
None => {
//
Ok(())
},
}
}
}

View File

@ -1,100 +0,0 @@
use super::edit_doc_context::EditDocContext;
use crate::service::{
doc::read_doc,
util::parse_from_bytes,
ws::{WsBizHandler, WsClientData},
};
use actix_web::web::Data;
use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData};
use flowy_net::errors::ServerError;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use protobuf::Message;
use sqlx::PgPool;
use std::{collections::HashMap, sync::Arc};
pub struct DocWsBizHandler {
doc_manager: Arc<EditDocManager>,
}
impl DocWsBizHandler {
pub fn new(pg_pool: Data<PgPool>) -> Self {
Self {
doc_manager: Arc::new(EditDocManager::new(pg_pool)),
}
}
}
impl WsBizHandler for DocWsBizHandler {
fn receive_data(&self, client_data: WsClientData) {
let doc_manager = self.doc_manager.clone();
actix_rt::spawn(async move {
let result = doc_manager.handle(client_data).await;
match result {
Ok(_) => {},
Err(e) => log::error!("WsBizHandler handle data error: {:?}", e),
}
});
}
}
struct EditDocManager {
pg_pool: Data<PgPool>,
edit_docs: RwLock<HashMap<String, Arc<EditDocContext>>>,
}
impl EditDocManager {
fn new(pg_pool: Data<PgPool>) -> Self {
Self {
pg_pool,
edit_docs: RwLock::new(HashMap::new()),
}
}
async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> {
let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?;
match document_data.ty {
WsDataType::Acked => {
// Do nothing,
},
WsDataType::PushRev => {
let revision: Revision = parse_from_bytes(&document_data.data)?;
let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
tokio::spawn(async move {
match edited_doc.apply_revision(client_data, revision).await {
Ok(_) => {},
Err(e) => log::error!("Doc apply revision failed: {:?}", e),
}
});
},
WsDataType::PullRev => {
// Do nothing
},
WsDataType::Conflict => {
unimplemented!()
},
}
Ok(())
}
async fn get_edit_doc(&self, doc_id: &str) -> Result<Arc<EditDocContext>, ServerError> {
// Opti: using lock free map instead?
let edit_docs = self.edit_docs.upgradable_read();
if let Some(doc) = edit_docs.get(doc_id) {
return Ok(doc.clone());
} else {
let mut edit_docs = RwLockUpgradableReadGuard::upgrade(edit_docs);
let pg_pool = self.pg_pool.clone();
let params = QueryDocParams {
doc_id: doc_id.to_string(),
..Default::default()
};
let doc = read_doc(pg_pool.get_ref(), params).await?;
let edit_doc = Arc::new(EditDocContext::new(doc, self.pg_pool.clone())?);
edit_docs.insert(doc_id.to_string(), edit_doc.clone());
Ok(edit_doc)
}
}
}

View File

@ -30,7 +30,7 @@ impl Builder {
.with_target(true)
.with_max_level(tracing::Level::DEBUG)
.with_writer(std::io::stderr)
.with_thread_ids(false)
.with_thread_ids(true)
.compact()
.finish()
.with(env_filter);

View File

@ -1,9 +1,3 @@
use crate::service::{doc::ws_handler::DocWsBizHandler, ws::WsBizHandlers};
use actix_web::web::Data;
use flowy_ws::WsModule;
use sqlx::PgPool;
use std::sync::Arc;
pub mod app;
pub mod doc;
pub(crate) mod log;
@ -12,16 +6,3 @@ pub(crate) mod util;
pub mod view;
pub mod workspace;
pub mod ws;
pub fn make_ws_biz_handlers(pg_pool: Data<PgPool>) -> WsBizHandlers {
let mut ws_biz_handlers = WsBizHandlers::new();
// doc
let doc_biz_handler = DocWsBizHandler::new(pg_pool);
ws_biz_handlers.register(WsModule::Doc, wrap(doc_biz_handler));
//
ws_biz_handlers
}
fn wrap<T>(val: T) -> Arc<T> { Arc::new(val) }

View File

@ -8,14 +8,7 @@ use flowy_net::{
};
use flowy_user::{
entities::parser::{UserEmail, UserName, UserPassword},
protobuf::{
SignInParams,
SignInResponse,
SignUpParams,
SignUpResponse,
UpdateUserParams,
UserProfile,
},
protobuf::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
};
use crate::{
@ -28,10 +21,8 @@ use super::AUTHORIZED_USERS;
use crate::service::user::user_default::create_default_workspace;
pub async fn sign_in(pool: &PgPool, params: SignInParams) -> Result<SignInResponse, ServerError> {
let email =
UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?;
let password = UserPassword::parse(params.password)
.map_err(|e| ServerError::params_invalid().context(e))?;
let email = UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?;
let password = UserPassword::parse(params.password).map_err(|e| ServerError::params_invalid().context(e))?;
let mut transaction = pool
.begin()
@ -62,16 +53,10 @@ pub async fn sign_out(logged_user: LoggedUser) -> Result<FlowyResponse, ServerEr
Ok(FlowyResponse::success())
}
pub async fn register_user(
pool: &PgPool,
params: SignUpParams,
) -> Result<FlowyResponse, ServerError> {
let name =
UserName::parse(params.name).map_err(|e| ServerError::params_invalid().context(e))?;
let email =
UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?;
let password = UserPassword::parse(params.password)
.map_err(|e| ServerError::params_invalid().context(e))?;
pub async fn register_user(pool: &PgPool, params: SignUpParams) -> Result<FlowyResponse, ServerError> {
let name = UserName::parse(params.name).map_err(|e| ServerError::params_invalid().context(e))?;
let email = UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?;
let password = UserPassword::parse(params.password).map_err(|e| ServerError::params_invalid().context(e))?;
let mut transaction = pool
.begin()
@ -79,14 +64,9 @@ pub async fn register_user(
.context("Failed to acquire a Postgres connection to register user")?;
let _ = is_email_exist(&mut transaction, email.as_ref()).await?;
let response_data = insert_new_user(
&mut transaction,
name.as_ref(),
email.as_ref(),
password.as_ref(),
)
.await
.context("Failed to insert user")?;
let response_data = insert_new_user(&mut transaction, name.as_ref(), email.as_ref(), password.as_ref())
.await
.context("Failed to insert user")?;
let logged_user = LoggedUser::new(&response_data.user_id);
AUTHORIZED_USERS.store_auth(logged_user, true);
@ -111,12 +91,11 @@ pub(crate) async fn get_user_profile(
.context("Failed to acquire a Postgres connection to get user detail")?;
let id = logged_user.as_uuid()?;
let user_table =
sqlx::query_as::<Postgres, UserTable>("SELECT * FROM user_table WHERE id = $1")
.bind(id)
.fetch_one(&mut transaction)
.await
.map_err(|err| ServerError::internal().context(err))?;
let user_table = sqlx::query_as::<Postgres, UserTable>("SELECT * FROM user_table WHERE id = $1")
.bind(id)
.fetch_one(&mut transaction)
.await
.map_err(|err| ServerError::internal().context(err))?;
transaction
.commit()
@ -146,11 +125,7 @@ pub(crate) async fn set_user_profile(
let name = match params.has_name() {
false => None,
true => Some(
UserName::parse(params.get_name().to_owned())
.map_err(invalid_params)?
.0,
),
true => Some(UserName::parse(params.get_name().to_owned()).map_err(invalid_params)?.0),
};
let email = match params.has_email() {
@ -165,8 +140,7 @@ pub(crate) async fn set_user_profile(
let password = match params.has_password() {
false => None,
true => {
let password =
UserPassword::parse(params.get_password().to_owned()).map_err(invalid_params)?;
let password = UserPassword::parse(params.get_password().to_owned()).map_err(invalid_params)?;
let password = hash_password(password.as_ref())?;
Some(password)
},
@ -192,10 +166,7 @@ pub(crate) async fn set_user_profile(
Ok(FlowyResponse::success())
}
async fn is_email_exist(
transaction: &mut DBTransaction<'_>,
email: &str,
) -> Result<(), ServerError> {
async fn is_email_exist(transaction: &mut DBTransaction<'_>, email: &str) -> Result<(), ServerError> {
let result = sqlx::query(r#"SELECT email FROM user_table WHERE email = $1"#)
.bind(email)
.fetch_optional(transaction)

View File

@ -6,6 +6,7 @@ use crate::{
},
sqlx_ext::{map_sqlx_error, DBTransaction},
};
use flowy_net::errors::ServerError;
use flowy_workspace::{
entities::view::DOC_DEFAULT_DATA,

View File

@ -5,21 +5,19 @@ use actix_web::{
use sqlx::PgPool;
use flowy_net::errors::ServerError;
use flowy_workspace::protobuf::{
CreateViewParams,
DeleteViewParams,
QueryViewParams,
UpdateViewParams,
};
use flowy_workspace::protobuf::{CreateViewParams, DeleteViewParams, QueryViewParams, UpdateViewParams};
use crate::service::{
doc::doc::DocBiz,
util::parse_from_payload,
view::{create_view, delete_view, read_view, update_view},
};
use std::sync::Arc;
pub async fn create_handler(
payload: Payload,
pool: Data<PgPool>,
_doc_biz: Data<Arc<DocBiz>>,
) -> Result<HttpResponse, ServerError> {
let params: CreateViewParams = parse_from_payload(payload).await?;
let resp = create_view(pool.get_ref(), params).await?;
@ -29,25 +27,20 @@ pub async fn create_handler(
pub async fn read_handler(
payload: Payload,
pool: Data<PgPool>,
doc_biz: Data<Arc<DocBiz>>,
) -> Result<HttpResponse, ServerError> {
let params: QueryViewParams = parse_from_payload(payload).await?;
let resp = read_view(pool.get_ref(), params).await?;
let resp = read_view(pool.get_ref(), params, doc_biz).await?;
Ok(resp.into())
}
pub async fn update_handler(
payload: Payload,
pool: Data<PgPool>,
) -> Result<HttpResponse, ServerError> {
pub async fn update_handler(payload: Payload, pool: Data<PgPool>) -> Result<HttpResponse, ServerError> {
let params: UpdateViewParams = parse_from_payload(payload).await?;
let resp = update_view(pool.get_ref(), params).await?;
Ok(resp.into())
}
pub async fn delete_handler(
payload: Payload,
pool: Data<PgPool>,
) -> Result<HttpResponse, ServerError> {
pub async fn delete_handler(payload: Payload, pool: Data<PgPool>) -> Result<HttpResponse, ServerError> {
let params: DeleteViewParams = parse_from_payload(payload).await?;
let resp = delete_view(pool.get_ref(), &params.view_id).await?;
Ok(resp.into())

View File

@ -18,11 +18,13 @@ use flowy_workspace::{
use crate::{
entities::workspace::{ViewTable, VIEW_TABLE},
service::{
doc::{create_doc, delete_doc},
doc::{create_doc, delete_doc, doc::DocBiz},
view::sql_builder::*,
},
sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
};
use actix_web::web::Data;
use std::sync::Arc;
pub(crate) async fn create_view(pool: &PgPool, params: CreateViewParams) -> Result<FlowyResponse, ServerError> {
let mut transaction = pool
@ -67,7 +69,11 @@ pub(crate) async fn create_view_with_transaction(
Ok(view)
}
pub(crate) async fn read_view(pool: &PgPool, params: QueryViewParams) -> Result<FlowyResponse, ServerError> {
pub(crate) async fn read_view(
pool: &PgPool,
params: QueryViewParams,
_doc_biz: Data<Arc<DocBiz>>,
) -> Result<FlowyResponse, ServerError> {
let view_id = check_view_id(params.view_id)?;
let mut transaction = pool
.begin()

View File

@ -1,7 +1,7 @@
use crate::service::ws::{WsBizHandlers, WsClient, WsServer, WsUser};
use actix::Addr;
use crate::service::user::LoggedUser;
use crate::{context::FlowyRuntime, service::user::LoggedUser};
use actix_web::{
get,
web::{Data, Path, Payload},
@ -17,12 +17,14 @@ pub async fn establish_ws_connection(
payload: Payload,
token: Path<String>,
server: Data<Addr<WsServer>>,
runtime: Data<FlowyRuntime>,
biz_handlers: Data<WsBizHandlers>,
) -> Result<HttpResponse, Error> {
log::info!("establish_ws_connection");
match LoggedUser::from_token(token.clone()) {
Ok(user) => {
let ws_user = WsUser::new(user.clone());
let client = WsClient::new(ws_user, server.get_ref().clone(), biz_handlers);
let client = WsClient::new(ws_user, server.get_ref().clone(), biz_handlers, runtime);
let result = ws::start(client, &request, payload);
match result {
Ok(response) => Ok(response.into()),

View File

@ -1,5 +1,6 @@
use crate::{
config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
context::FlowyRuntime,
service::{
user::LoggedUser,
ws::{
@ -38,16 +39,23 @@ pub struct WsClient {
user: Arc<WsUser>,
server: Addr<WsServer>,
biz_handlers: Data<WsBizHandlers>,
runtime: Data<FlowyRuntime>,
hb: Instant,
}
impl WsClient {
pub fn new(user: WsUser, server: Addr<WsServer>, biz_handlers: Data<WsBizHandlers>) -> Self {
pub fn new(
user: WsUser,
server: Addr<WsServer>,
biz_handlers: Data<WsBizHandlers>,
runtime: Data<FlowyRuntime>,
) -> Self {
Self {
user: Arc::new(user),
server,
biz_handlers,
hb: Instant::now(),
runtime,
}
}
@ -77,7 +85,7 @@ impl WsClient {
socket,
data: Bytes::from(message.data),
};
handler.receive_data(client_data)
handler.receive_data(client_data);
},
}
}

View File

@ -46,9 +46,7 @@ impl Handler<Disconnect> for WsServer {
impl Handler<WsMessageAdaptor> for WsServer {
type Result = ();
fn handle(&mut self, _msg: WsMessageAdaptor, _ctx: &mut Context<Self>) -> Self::Result {
unimplemented!()
}
fn handle(&mut self, _msg: WsMessageAdaptor, _ctx: &mut Context<Self>) -> Self::Result { unimplemented!() }
}
impl actix::Supervised for WsServer {

View File

@ -3,5 +3,12 @@ use crate::document::helper::{DocScript, DocumentTest};
#[actix_rt::test]
async fn edit_doc_insert_text() {
let test = DocumentTest::new().await;
test.run_scripts(vec![DocScript::SendText("abc")]).await;
test.run_scripts(vec![
DocScript::SendText(0, "abc"),
DocScript::SendText(3, "123"),
DocScript::SendText(6, "efg"),
DocScript::AssertClient(r#"[{"insert":"abc123efg\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc123efg\n"}]"#),
])
.await;
}

View File

@ -1,32 +1,25 @@
// use crate::helper::*;
use crate::helper::{spawn_server, TestServer};
use actix_web::web::Data;
use flowy_document::{
entities::doc::{DocDelta, QueryDocParams},
module::FlowyDocument,
services::doc::edit_doc_context::EditDocContext,
entities::doc::QueryDocParams,
services::doc::edit_doc_context::EditDocContext as ClientEditDocContext,
};
use flowy_net::config::ServerConfig;
use flowy_ot::core::Delta;
use flowy_test::{workspace::ViewTest, FlowyTest, FlowyTestSDK};
use flowy_user::services::user::UserSession;
use std::{str::FromStr, sync::Arc};
use tokio::time::{interval, Duration};
use flowy_test::{workspace::ViewTest, FlowyTest};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
pub struct DocumentTest {
server: TestServer,
flowy_test: FlowyTest,
flowy_document: Arc<FlowyDocument>,
user_session: Arc<UserSession>,
edit_context: Arc<EditDocContext>,
}
#[derive(Clone)]
pub enum DocScript {
SendText(&'static str),
SendBinary(Vec<u8>),
SendText(usize, &'static str),
AssertClient(&'static str),
AssertServer(&'static str),
}
impl DocumentTest {
@ -34,46 +27,57 @@ impl DocumentTest {
let server = spawn_server().await;
let server_config = ServerConfig::new(&server.host, "http", "ws");
let flowy_test = FlowyTest::setup_with(server_config);
init_user(&flowy_test).await;
let edit_context = create_doc(&flowy_test).await;
let user_session = flowy_test.sdk.user_session.clone();
let flowy_document = flowy_test.sdk.flowy_document.clone();
Self {
server,
flowy_test,
flowy_document,
user_session,
edit_context,
}
Self { server, flowy_test }
}
pub async fn run_scripts(self, scripts: Vec<DocScript>) {
for script in scripts {
match script {
DocScript::SendText(s) => {
let delta = Delta::from_str(s).unwrap();
let data = delta.to_json();
let doc_delta = DocDelta {
doc_id: self.edit_context.doc_id.clone(),
data,
};
self.flowy_document.apply_doc_delta(doc_delta).await;
},
DocScript::SendBinary(_bytes) => {},
}
}
std::mem::forget(self);
let mut interval = interval(Duration::from_secs(5));
interval.tick().await;
interval.tick().await;
init_user(&self.flowy_test).await;
let DocumentTest { server, flowy_test } = self;
run_scripts(server, flowy_test, scripts).await;
sleep(Duration::from_secs(5)).await;
}
}
async fn create_doc(flowy_test: &FlowyTest) -> Arc<EditDocContext> {
pub async fn run_scripts(server: TestServer, flowy_test: FlowyTest, scripts: Vec<DocScript>) {
let client_edit_context = create_doc(&flowy_test).await;
let doc_id = client_edit_context.doc_id.clone();
for script in scripts {
match script {
DocScript::SendText(index, s) => {
client_edit_context.insert(index, s);
},
DocScript::AssertClient(s) => {
let json = client_edit_context.doc_json();
assert_eq(s, &json);
},
DocScript::AssertServer(s) => {
sleep(Duration::from_millis(100)).await;
let pool = server.pg_pool.clone();
let edit_context = server
.app_ctx
.doc_biz
.manager
.get(&doc_id, Data::new(pool))
.await
.unwrap()
.unwrap();
let json = edit_context.doc_json();
assert_eq(s, &json);
},
}
}
std::mem::forget(flowy_test);
}
fn assert_eq(expect: &str, receive: &str) {
if expect != receive {
log::error!("expect: {}", expect);
log::error!("but receive: {}", receive);
}
assert_eq!(expect, receive);
}
async fn create_doc(flowy_test: &FlowyTest) -> Arc<ClientEditDocContext> {
let view_test = ViewTest::new(flowy_test).await;
let doc_id = view_test.view.id.clone();
let user_session = flowy_test.sdk.user_session.clone();

View File

@ -1,8 +1,10 @@
use backend::{
application::{get_connection_pool, Application},
config::{get_configuration, DatabaseSettings},
context::AppContext,
};
use backend::application::init_app_context;
use flowy_document::{
entities::doc::{Doc, QueryDocParams},
prelude::*,
@ -168,6 +170,7 @@ pub struct TestServer {
pub host: String,
pub port: u16,
pub pg_pool: PgPool,
pub app_ctx: AppContext,
}
pub async fn spawn_server() -> TestServer {
@ -181,7 +184,8 @@ pub async fn spawn_server() -> TestServer {
};
let _ = configure_database(&configuration.database).await;
let application = Application::build(configuration.clone())
let app_ctx = init_app_context(&configuration).await;
let application = Application::build(configuration.clone(), app_ctx.clone())
.await
.expect("Failed to build application.");
let application_port = application.port();
@ -197,6 +201,7 @@ pub async fn spawn_server() -> TestServer {
pg_pool: get_connection_pool(&configuration.database)
.await
.expect("Failed to connect to the database"),
app_ctx,
}
}