save doc delta & cache the connect user information

This commit is contained in:
appflowy
2021-09-26 20:04:47 +08:00
parent 77313ab431
commit bd4caa2e99
22 changed files with 331 additions and 533 deletions

View File

@ -1,11 +1,12 @@
use crate::service::{
doc::update_doc,
util::md5,
ws::{entities::Socket, WsMessageAdaptor},
ws::{entities::Socket, WsClientData, WsMessageAdaptor, WsUser},
};
use actix_web::web::Data;
use byteorder::{BigEndian, WriteBytesExt};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_document::{
entities::ws::{WsDataType, WsDocumentData},
protobuf::{Doc, RevType, Revision, UpdateDocParams},
@ -20,48 +21,93 @@ use flowy_ws::WsMessage;
use parking_lot::RwLock;
use protobuf::Message;
use sqlx::PgPool;
use std::{convert::TryInto, sync::Arc, time::Duration};
use std::{
convert::TryInto,
sync::{
atomic::{AtomicI64, Ordering::SeqCst},
Arc,
},
time::Duration,
};
struct EditUser {
user: Arc<WsUser>,
socket: Socket,
}
pub(crate) struct EditDocContext {
doc_id: String,
rev_id: i64,
rev_id: AtomicI64,
document: Arc<RwLock<Document>>,
pg_pool: Data<PgPool>,
users: DashMap<String, EditUser>,
}
impl EditDocContext {
pub(crate) fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
let users = DashMap::new();
Ok(Self {
doc_id: doc.id.clone(),
rev_id: doc.rev_id,
rev_id: AtomicI64::new(doc.rev_id),
document,
pg_pool,
users,
})
}
#[tracing::instrument(level = "debug", skip(self, socket, revision))]
pub(crate) async fn apply_revision(&self, socket: Socket, revision: Revision) -> Result<(), ServerError> {
#[tracing::instrument(level = "debug", skip(self, client_data, revision))]
pub(crate) async fn apply_revision(
&self,
client_data: WsClientData,
revision: Revision,
) -> Result<(), ServerError> {
let _ = self.verify_md5(&revision)?;
// Rest EditUser for each client websocket message to keep the socket available.
let user = EditUser {
user: client_data.user.clone(),
socket: client_data.socket.clone(),
};
self.users.insert(client_data.user.id().to_owned(), user);
if self.rev_id > revision.rev_id {
let (cli_prime, server_prime) = self.compose(&revision.delta).map_err(internal_error)?;
log::debug!(
"cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}",
self.rev_id.load(SeqCst),
revision.base_rev_id,
revision.rev_id
);
let cli_socket = client_data.socket;
let cur_rev_id = self.rev_id.load(SeqCst);
// Transform the revision if client rev_id less than server rev_id. Sending the
// prime delta to client.
if cur_rev_id > revision.rev_id {
let (cli_prime, server_prime) = self.transform(&revision.delta).map_err(internal_error)?;
let _ = self.update_document_delta(server_prime)?;
log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
let ws_cli_revision = mk_rev_ws_message(&self.doc_id, cli_revision);
socket.do_send(ws_cli_revision).map_err(internal_error)?;
cli_socket.do_send(ws_cli_revision).map_err(internal_error)?;
Ok(())
} else if cur_rev_id < revision.rev_id {
if cur_rev_id != revision.base_rev_id {
let missing_rev_range = revision.rev_id - cur_rev_id;
// TODO: pull the missing revs from client
} else {
let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?;
let _ = self.update_document_delta(delta)?;
cli_socket.do_send(mk_acked_ws_message(&revision));
self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
// Opti: save with multiple revisions
let _ = self.save_revision(&revision).await?;
}
Ok(())
} else {
let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?;
let _ = self.update_document_delta(delta)?;
socket.do_send(mk_acked_ws_message(&revision));
// Opti: save with multiple revisions
let _ = self.save_revision(&revision).await?;
Ok(())
log::error!("Client rev_id should not equal to server rev_id");
}
}
@ -70,7 +116,7 @@ impl EditDocContext {
let md5 = md5(&delta_data);
let revision = Revision {
base_rev_id,
rev_id: self.rev_id,
rev_id: self.rev_id.load(SeqCst),
delta: delta_data,
md5,
doc_id: self.doc_id.to_string(),
@ -81,10 +127,12 @@ impl EditDocContext {
}
#[tracing::instrument(level = "debug", skip(self, delta_data))]
fn compose(&self, delta_data: &Vec<u8>) -> Result<(Delta, Delta), OTError> {
log::debug!("{} document data: {}", self.doc_id, self.document.read().to_json());
fn transform(&self, delta_data: &Vec<u8>) -> Result<(Delta, Delta), OTError> {
log::debug!("Document: {}", self.document.read().to_json());
let doc_delta = self.document.read().delta().clone();
let cli_delta = Delta::from_bytes(delta_data)?;
log::debug!("Compose delta: {}", cli_delta);
let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?;
Ok((cli_prime, server_prime))
@ -99,8 +147,7 @@ impl EditDocContext {
},
Some(mut write_guard) => {
let _ = write_guard.compose_delta(&delta).map_err(internal_error)?;
log::debug!("Document: {}", write_guard.to_plain_string());
log::debug!("Document: {}", write_guard.to_json());
},
}
Ok(())

View File

@ -6,6 +6,7 @@ use crate::service::{
};
use actix_web::web::Data;
use crate::service::ws::WsUser;
use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData};
use flowy_net::errors::ServerError;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
@ -53,22 +54,19 @@ impl EditDocManager {
async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> {
let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?;
match document_data.ty {
WsDataType::Acked => {},
WsDataType::Rev => {
let revision: Revision = parse_from_bytes(&document_data.data)?;
let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
tokio::spawn(async move {
match edited_doc
.apply_revision(client_data.socket, revision)
.await
{
match edited_doc.apply_revision(client_data, revision).await {
Ok(_) => {},
Err(e) => log::error!("Doc apply revision failed: {:?}", e),
}
});
},
_ => {},
}
Ok(())

View File

@ -1,4 +1,4 @@
use crate::service::ws::{WsBizHandlers, WsClient, WsServer};
use crate::service::ws::{WsBizHandlers, WsClient, WsServer, WsUser};
use actix::Addr;
use crate::service::user::LoggedUser;
@ -21,7 +21,8 @@ pub async fn establish_ws_connection(
) -> Result<HttpResponse, Error> {
match LoggedUser::from_token(token.clone()) {
Ok(user) => {
let client = WsClient::new(&user.user_id, server.get_ref().clone(), biz_handlers);
let ws_user = WsUser::new(user.clone());
let client = WsClient::new(ws_user, server.get_ref().clone(), biz_handlers);
let result = ws::start(client, &request, payload);
match result {
Ok(response) => Ok(response.into()),

View File

@ -1,11 +1,14 @@
use crate::{
config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
service::ws::{
entities::{Connect, Disconnect, SessionId, Socket},
WsBizHandler,
WsBizHandlers,
WsMessageAdaptor,
WsServer,
service::{
user::LoggedUser,
ws::{
entities::{Connect, Disconnect, SessionId, Socket},
WsBizHandler,
WsBizHandlers,
WsMessageAdaptor,
WsServer,
},
},
};
use actix::*;
@ -13,28 +16,35 @@ use actix_web::web::Data;
use actix_web_actors::{ws, ws::Message::Text};
use bytes::Bytes;
use flowy_ws::WsMessage;
use std::{convert::TryFrom, time::Instant};
use std::{convert::TryFrom, sync::Arc, time::Instant};
pub struct WsUser {
inner: LoggedUser,
}
impl WsUser {
pub fn new(inner: LoggedUser) -> Self { Self { inner } }
pub fn id(&self) -> &str { &self.inner.user_id }
}
pub struct WsClientData {
pub(crate) user: Arc<WsUser>,
pub(crate) socket: Socket,
pub(crate) data: Bytes,
}
pub struct WsClient {
session_id: SessionId,
user: Arc<WsUser>,
server: Addr<WsServer>,
biz_handlers: Data<WsBizHandlers>,
hb: Instant,
}
impl WsClient {
pub fn new<T: Into<SessionId>>(
session_id: T,
server: Addr<WsServer>,
biz_handlers: Data<WsBizHandlers>,
) -> Self {
pub fn new(user: WsUser, server: Addr<WsServer>, biz_handlers: Data<WsBizHandlers>) -> Self {
Self {
session_id: session_id.into(),
user: Arc::new(user),
server,
biz_handlers,
hb: Instant::now(),
@ -45,7 +55,7 @@ impl WsClient {
ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| {
if Instant::now().duration_since(client.hb) > PING_TIMEOUT {
client.server.do_send(Disconnect {
sid: client.session_id.clone(),
sid: client.user.id().into(),
});
ctx.stop();
} else {
@ -63,6 +73,7 @@ impl WsClient {
},
Some(handler) => {
let client_data = WsClientData {
user: self.user.clone(),
socket,
data: Bytes::from(message.data),
};
@ -84,7 +95,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsClient {
self.hb = Instant::now();
},
Ok(ws::Message::Binary(bytes)) => {
log::debug!(" Receive {} binary", &self.session_id);
let socket = ctx.address().recipient();
self.handle_binary_message(bytes, socket);
},
@ -98,11 +108,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsClient {
Ok(ws::Message::Continuation(_)) => {},
Ok(ws::Message::Nop) => {},
Err(e) => {
log::error!(
"[{}]: WebSocketStream protocol error {:?}",
self.session_id,
e
);
log::error!("[{}]: WebSocketStream protocol error {:?}", self.user.id(), e);
ctx.stop();
},
}
@ -123,7 +129,7 @@ impl Actor for WsClient {
let socket = ctx.address().recipient();
let connect = Connect {
socket,
sid: self.session_id.clone(),
sid: self.user.id().into(),
};
self.server
.send(connect)
@ -141,7 +147,7 @@ impl Actor for WsClient {
fn stopping(&mut self, _: &mut Self::Context) -> Running {
self.server.do_send(Disconnect {
sid: self.session_id.clone(),
sid: self.user.id().into(),
});
Running::Stop