test sync

This commit is contained in:
appflowy 2021-12-13 22:46:35 +08:00
parent 31086ad4df
commit 5b7e6690f8
34 changed files with 595 additions and 432 deletions

View File

@ -9,7 +9,7 @@ use sqlx::{postgres::PgArguments, PgPool, Postgres};
use uuid::Uuid;
#[tracing::instrument(level = "debug", skip(transaction), err)]
pub(crate) async fn create_doc(
pub(crate) async fn create_doc_with_transaction(
transaction: &mut DBTransaction<'_>,
params: CreateDocParams,
) -> Result<(), ServerError> {
@ -23,6 +23,22 @@ pub(crate) async fn create_doc(
Ok(())
}
pub(crate) async fn create_doc(pool: &PgPool, params: CreateDocParams) -> Result<(), ServerError> {
let mut transaction = pool
.begin()
.await
.context("Failed to acquire a Postgres connection to create doc")?;
let _ = create_doc_with_transaction(&mut transaction, params).await?;
transaction
.commit()
.await
.context("Failed to commit SQL transaction to create doc.")?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(pool), err)]
pub(crate) async fn read_doc(pool: &PgPool, params: DocIdentifier) -> Result<Doc, ServerError> {
let doc_id = Uuid::parse_str(&params.doc_id)?;

View File

@ -8,14 +8,16 @@ use crate::{
};
use actix_web::web::Data;
use crate::services::doc::create_doc;
use backend_service::errors::ServerError;
use flowy_collaboration::{
core::sync::{ServerDocManager, ServerDocPersistence},
entities::doc::Doc,
errors::CollaborateError,
protobuf::{DocIdentifier, UpdateDocParams},
protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams},
};
use lib_infra::future::FutureResultSend;
use lib_ot::rich_text::RichTextDelta;
use lib_ot::{revision::Revision, rich_text::RichTextDelta};
use sqlx::PgPool;
use std::{convert::TryInto, sync::Arc};
use tokio::sync::{mpsc, oneshot};
@ -77,7 +79,7 @@ impl ServerDocPersistence for DocPersistenceImpl {
FutureResultSend::new(async move {
let _ = update_doc(pg_pool.get_ref(), params)
.await
.map_err(|e| CollaborateError::internal().context(e))?;
.map_err(server_error_to_collaborate_error)?;
Ok(())
})
}
@ -91,11 +93,40 @@ impl ServerDocPersistence for DocPersistenceImpl {
FutureResultSend::new(async move {
let mut pb_doc = read_doc(pg_pool.get_ref(), params)
.await
.map_err(|e| CollaborateError::internal().context(e))?;
.map_err(server_error_to_collaborate_error)?;
let doc = (&mut pb_doc)
.try_into()
.map_err(|e| CollaborateError::internal().context(e))?;
Ok(doc)
})
}
fn create_doc(&self, revision: Revision) -> FutureResultSend<Doc, CollaborateError> {
let pg_pool = self.0.clone();
FutureResultSend::new(async move {
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let doc_json = delta.to_json();
let params = CreateDocParams {
id: revision.doc_id.clone(),
data: doc_json.clone(),
unknown_fields: Default::default(),
cached_size: Default::default(),
};
let _ = create_doc(pg_pool.get_ref(), params)
.await
.map_err(server_error_to_collaborate_error)?;
let doc: Doc = revision.try_into()?;
Ok(doc)
})
}
}
fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
if error.is_record_not_found() {
CollaborateError::record_not_found()
} else {
CollaborateError::internal().context(error)
}
}

View File

@ -6,26 +6,14 @@ use actix_web::{
web::{Data, Payload},
HttpResponse,
};
use anyhow::Context;
use backend_service::{errors::ServerError, response::FlowyResponse};
use flowy_collaboration::protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams};
use sqlx::PgPool;
pub async fn create_handler(payload: Payload, pool: Data<PgPool>) -> Result<HttpResponse, ServerError> {
let params: CreateDocParams = parse_from_payload(payload).await?;
let mut transaction = pool
.begin()
.await
.context("Failed to acquire a Postgres connection to create doc")?;
let _ = create_doc(&mut transaction, params).await?;
transaction
.commit()
.await
.context("Failed to commit SQL transaction to create doc.")?;
let _ = create_doc(&pool, params).await?;
Ok(FlowyResponse::success().into())
}

View File

@ -10,8 +10,8 @@ use actix_web::web::Data;
use async_stream::stream;
use backend_service::errors::{internal_error, Result as DocResult, ServerError};
use flowy_collaboration::{
core::sync::{OpenDocHandle, ServerDocManager},
protobuf::{NewDocUser, WsDataType, WsDocumentData},
core::sync::ServerDocManager,
protobuf::{WsDataType, WsDocumentData},
};
use futures::stream::StreamExt;
use lib_ot::protobuf::Revision;
@ -80,32 +80,11 @@ impl DocWsActor {
match document_data.ty {
WsDataType::Acked => Ok(()),
WsDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await,
WsDataType::NewDocUser => self.add_doc_user(user, socket, data, pool).await,
WsDataType::PullRev => Ok(()),
WsDataType::Conflict => Ok(()),
}
}
async fn add_doc_user(
&self,
user: Arc<WsUser>,
socket: Socket,
data: Vec<u8>,
pg_pool: Data<PgPool>,
) -> DocResult<()> {
let doc_user = spawn_blocking(move || {
let user: NewDocUser = parse_from_bytes(&data)?;
DocResult::Ok(user)
})
.await
.map_err(internal_error)??;
if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await {
let user = Arc::new(ServerDocUser { user, socket, pg_pool });
handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?;
}
Ok(())
}
async fn apply_pushed_rev(
&self,
user: Arc<WsUser>,
@ -113,30 +92,27 @@ impl DocWsActor {
data: Vec<u8>,
pg_pool: Data<PgPool>,
) -> DocResult<()> {
let mut revision = spawn_blocking(move || {
let mut revision_pb = spawn_blocking(move || {
let revision: Revision = parse_from_bytes(&data)?;
let _ = verify_md5(&revision)?;
DocResult::Ok(revision)
})
.await
.map_err(internal_error)??;
if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await {
let user = Arc::new(ServerDocUser { user, socket, pg_pool });
let revision = (&mut revision).try_into().map_err(internal_error).unwrap();
handle.apply_revision(user, revision).await.map_err(internal_error)?;
}
Ok(())
}
let revision: lib_ot::revision::Revision = (&mut revision_pb).try_into().map_err(internal_error)?;
// Create the doc if it doesn't exist
let handler = match self.doc_manager.get(&revision.doc_id).await {
None => self
.doc_manager
.create_doc(revision.clone())
.await
.map_err(internal_error)?,
Some(handler) => handler,
};
async fn get_doc_handle(&self, doc_id: &str, _pg_pool: Data<PgPool>) -> Option<Arc<OpenDocHandle>> {
match self.doc_manager.get(doc_id).await {
Ok(Some(edit_doc)) => Some(edit_doc),
Ok(None) => None,
Err(e) => {
log::error!("{}", e);
None
},
}
let user = Arc::new(ServerDocUser { user, socket, pg_pool });
handler.apply_revision(user, revision).await.map_err(internal_error)?;
Ok(())
}
}

View File

@ -1,7 +1,7 @@
use crate::{
entities::workspace::{ViewTable, VIEW_TABLE},
services::{
doc::{create_doc, delete_doc},
doc::{create_doc_with_transaction, delete_doc},
trash::read_trash_ids,
user::LoggedUser,
view::sql_builder::*,
@ -94,7 +94,7 @@ pub(crate) async fn create_view_with_args(
let mut create_doc_params = CreateDocParams::new();
create_doc_params.set_data(view_data);
create_doc_params.set_id(view.id.clone());
let _ = create_doc(transaction, create_doc_params).await?;
let _ = create_doc_with_transaction(transaction, create_doc_params).await?;
Ok(view)
}

View File

@ -14,14 +14,12 @@ class WsDataType extends $pb.ProtobufEnum {
static const WsDataType PushRev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev');
static const WsDataType PullRev = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev');
static const WsDataType Conflict = WsDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict');
static const WsDataType NewDocUser = WsDataType._(4, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'NewDocUser');
static const $core.List<WsDataType> values = <WsDataType> [
Acked,
PushRev,
PullRev,
Conflict,
NewDocUser,
];
static final $core.Map<$core.int, WsDataType> _byValue = $pb.ProtobufEnum.initByValue(values);

View File

@ -16,12 +16,11 @@ const WsDataType$json = const {
const {'1': 'PushRev', '2': 1},
const {'1': 'PullRev', '2': 2},
const {'1': 'Conflict', '2': 3},
const {'1': 'NewDocUser', '2': 4},
],
};
/// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBADEg4KCk5ld0RvY1VzZXIQBA==');
final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBAD');
@$core.Deprecated('Use wsDocumentDataDescriptor instead')
const WsDocumentData$json = const {
'1': 'WsDocumentData',

View File

@ -55,7 +55,7 @@ impl DocController {
}
pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> {
log::debug!("Close doc {}", doc_id);
tracing::debug!("Close doc {}", doc_id);
self.open_cache.remove(doc_id);
self.ws_manager.remove_handler(doc_id);
Ok(())

View File

@ -63,7 +63,7 @@ impl ClientDocEditor {
stop_sync_tx,
});
edit_doc.notify_open_doc();
// edit_doc.notify_open_doc();
start_sync(edit_doc.clone(), ws_msg_rx, cloned_stop_sync_tx);
Ok(edit_doc)
@ -165,7 +165,7 @@ impl ClientDocEditor {
let delta_data = delta_data.to_vec();
let user_id = self.user.user_id()?;
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local, user_id);
let _ = self.rev_manager.add_revision(&revision).await?;
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(rev_id.into())
}
@ -246,7 +246,7 @@ impl ClientDocEditor {
RevType::Remote,
user_id,
);
let _ = self.rev_manager.add_revision(&revision).await?;
let _ = self.rev_manager.add_remote_revision(&revision).await?;
// send the server_prime delta
let user_id = self.user.user_id()?;
@ -264,10 +264,8 @@ impl ClientDocEditor {
pub async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> {
match self.ws_msg_tx.send(doc_data) {
Ok(_) => {
tracing::debug!("Propagate ws message data success")
},
Err(e) => tracing::error!("Propagate ws message data failed. {}", e),
Ok(_) => {},
Err(e) => tracing::error!("❌Propagate ws message failed. {}", e),
}
Ok(())
}
@ -286,7 +284,7 @@ impl WsDocumentHandler for EditDocWsHandler {
let edit_doc = self.0.clone();
tokio::spawn(async move {
if let Err(e) = edit_doc.handle_ws_message(doc_data).await {
log::error!("{:?}", e);
tracing::error!("{:?}", e);
}
});
}

View File

@ -1,10 +1,13 @@
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt::skip)]
use crate::{errors::DocError, services::ws::DocumentWebSocket};
use flowy_collaboration::entities::doc::NewDocUser;
use futures::future::BoxFuture;
use lib_infra::retry::Action;
use lib_ot::revision::RevId;
use std::{future, sync::Arc};
#[allow(dead_code)]
pub(crate) struct OpenDocAction {
user_id: String,
rev_id: RevId,
@ -29,15 +32,16 @@ impl Action for OpenDocAction {
type Error = DocError;
fn run(&mut self) -> Self::Future {
let new_doc_user = NewDocUser {
user_id: self.user_id.clone(),
rev_id: self.rev_id.clone().into(),
doc_id: self.doc_id.clone(),
};
match self.ws.send(new_doc_user.into()) {
Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))),
Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))),
}
// let new_doc_user = NewDocUser {
// user_id: self.user_id.clone(),
// rev_id: self.rev_id.clone().into(),
// doc_id: self.doc_id.clone(),
// };
//
// match self.ws.send(new_doc_user.into()) {
// Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))),
// Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))),
// }
Box::pin(future::ready(Ok::<(), DocError>(())))
}
}

View File

@ -1,6 +1,10 @@
use crate::{
errors::{internal_error, DocError, DocResult},
services::doc::revision::RevisionServer,
services::doc::revision::{
cache::{disk::RevisionDiskCache, memory::RevisionMemoryCache},
RevisionRecord,
RevisionServer,
},
sql_tables::RevTableSql,
};
use flowy_collaboration::entities::doc::Doc;
@ -8,7 +12,7 @@ use flowy_database::ConnectionPool;
use lib_infra::future::FutureResult;
use lib_ot::{
core::{Operation, OperationTransformable},
revision::{RevState, RevType, Revision, RevisionDiskCache, RevisionMemoryCache, RevisionRange, RevisionRecord},
revision::{RevState, RevType, Revision, RevisionRange},
rich_text::RichTextDelta,
};
use std::{sync::Arc, time::Duration};
@ -53,11 +57,29 @@ impl RevisionCache {
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_revision(&self, revision: Revision) -> DocResult<()> {
pub async fn add_local_revision(&self, revision: Revision) -> DocResult<()> {
if self.memory_cache.contains(&revision.rev_id) {
return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
}
self.memory_cache.add_revision(revision.clone()).await?;
let record = RevisionRecord {
revision,
state: RevState::Local,
};
self.memory_cache.add_revision(record).await?;
self.save_revisions().await;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_remote_revision(&self, revision: Revision) -> DocResult<()> {
if self.memory_cache.contains(&revision.rev_id) {
return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
}
let record = RevisionRecord {
revision,
state: RevState::Local,
};
self.memory_cache.add_revision(record).await?;
self.save_revisions().await;
Ok(())
}
@ -68,8 +90,17 @@ impl RevisionCache {
self.save_revisions().await;
}
pub async fn query_revision(&self, rev_id: i64) -> Option<RevisionRecord> {
self.memory_cache.query_revision(&rev_id).await
pub async fn query_revision(&self, doc_id: &str, rev_id: i64) -> Option<RevisionRecord> {
match self.memory_cache.query_revision(&rev_id).await {
None => match self.dish_cache.read_revision(doc_id, rev_id) {
Ok(revision) => revision,
Err(e) => {
log::error!("query_revision error: {:?}", e);
None
},
},
Some(record) => Some(record),
}
}
async fn save_revisions(&self) {
@ -102,9 +133,15 @@ impl RevisionCache {
} else {
let doc_id = self.doc_id.clone();
let disk_cache = self.dish_cache.clone();
spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
.await
.map_err(internal_error)?
.map_err(internal_error)??;
let revisions = records
.into_iter()
.map(|record| record.revision)
.collect::<Vec<Revision>>();
Ok(revisions)
}
}
@ -126,11 +163,8 @@ impl RevisionCache {
RevType::Remote,
self.user_id.clone(),
);
let record = RevisionRecord {
revision,
state: RevState::Acked,
};
let _ = self.dish_cache.create_revisions(vec![record])?;
self.add_remote_revision(revision).await?;
Ok(doc)
}
}
@ -141,14 +175,14 @@ impl RevisionIterator for RevisionCache {
let disk_cache = self.dish_cache.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
match memory_cache.front_revision().await {
match memory_cache.front_local_revision().await {
None => {
//
match memory_cache.front_rev_id().await {
match memory_cache.front_local_rev_id().await {
None => Ok(None),
Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
None => Ok(None),
Some(revision) => Ok(Some(RevisionRecord::new(revision))),
Some(record) => Ok(Some(record)),
},
}
},
@ -166,25 +200,25 @@ async fn load_from_disk(
let doc_id = doc_id.to_owned();
let (tx, mut rx) = mpsc::channel(2);
let doc = spawn_blocking(move || {
let revisions = disk_cache.read_revisions(&doc_id)?;
if revisions.is_empty() {
let records = disk_cache.read_revisions(&doc_id)?;
if records.is_empty() {
return Err(DocError::doc_not_found().context("Local doesn't have this document"));
}
let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
let (base_rev_id, rev_id) = records.last().unwrap().revision.pair_rev_id();
let mut delta = RichTextDelta::new();
for (_, revision) in revisions.into_iter().enumerate() {
for (_, record) in records.into_iter().enumerate() {
// Opti: revision's clone may cause memory issues
match RichTextDelta::from_bytes(revision.clone().delta_data) {
match RichTextDelta::from_bytes(record.revision.clone().delta_data) {
Ok(local_delta) => {
delta = delta.compose(&local_delta)?;
match tx.blocking_send(revision) {
match tx.blocking_send(record) {
Ok(_) => {},
Err(e) => log::error!("Load document from disk error: {}", e),
Err(e) => tracing::error!("Load document from disk error: {}", e),
}
},
Err(e) => {
log::error!("Deserialize delta from revision failed: {}", e);
tracing::error!("Deserialize delta from revision failed: {}", e);
},
}
}
@ -200,13 +234,12 @@ async fn load_from_disk(
.await
.map_err(internal_error)?;
while let Some(revision) = rx.recv().await {
match memory_cache.add_revision(revision).await {
while let Some(record) = rx.recv().await {
match memory_cache.add_revision(record).await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
}
}
doc
}
@ -217,7 +250,7 @@ fn correct_delta_if_need(delta: &mut RichTextDelta) {
let data = delta.ops.last().as_ref().unwrap().get_data();
if !data.ends_with('\n') {
log::error!("The op must end with newline. Correcting it by inserting newline op");
log::error!("The op must end with newline. Correcting it by inserting newline op");
delta.ops.push(Operation::Insert("\n".into()));
}
}
@ -238,19 +271,19 @@ impl RevisionDiskCache for Persistence {
})
}
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<Revision>, Self::Error> {
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error).unwrap();
let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?;
Ok(revisions)
}
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<Revision>, Self::Error> {
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?;
Ok(some)
}
fn read_revisions(&self, doc_id: &str) -> Result<Vec<Revision>, Self::Error> {
fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?;
Ok(some)

View File

@ -0,0 +1,12 @@
use crate::services::doc::revision::RevisionRecord;
use lib_ot::revision::RevisionRange;
use std::fmt::Debug;
pub trait RevisionDiskCache: Sync + Send {
type Error: Debug;
fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error>;
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<RevisionRecord>, Self::Error>;
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error>;
fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error>;
}

View File

@ -0,0 +1,126 @@
use crate::services::doc::revision::RevisionRecord;
use dashmap::DashMap;
use lib_ot::{
errors::OTError,
revision::{RevState, Revision, RevisionRange},
};
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::RwLock;
pub struct RevisionMemoryCache {
revs_map: Arc<DashMap<i64, RevisionRecord>>,
local_revs: Arc<RwLock<VecDeque<i64>>>,
}
impl std::default::Default for RevisionMemoryCache {
fn default() -> Self {
let local_revs = Arc::new(RwLock::new(VecDeque::new()));
RevisionMemoryCache {
revs_map: Arc::new(DashMap::new()),
local_revs,
}
}
}
impl RevisionMemoryCache {
pub fn new() -> Self { RevisionMemoryCache::default() }
pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
// The last revision's rev_id must be greater than the new one.
if let Some(rev_id) = self.local_revs.read().await.back() {
if *rev_id >= record.revision.rev_id {
return Err(OTError::revision_id_conflict()
.context(format!("The new revision's id must be greater than {}", rev_id)));
}
}
match record.state {
RevState::Local => {
tracing::debug!("{}:add revision {}", record.revision.doc_id, record.revision.rev_id);
self.local_revs.write().await.push_back(record.revision.rev_id);
},
RevState::Acked => {},
}
self.revs_map.insert(record.revision.rev_id, record);
Ok(())
}
pub fn remove_revisions(&self, ids: Vec<i64>) { self.revs_map.retain(|k, _| !ids.contains(k)); }
pub async fn ack_revision(&self, rev_id: &i64) {
if let Some(pop_rev_id) = self.front_local_rev_id().await {
if &pop_rev_id != rev_id {
return;
}
}
match self.local_revs.write().await.pop_front() {
None => tracing::error!("❌The local_revs should not be empty"),
Some(pop_rev_id) => {
if &pop_rev_id != rev_id {
tracing::error!("The front rev_id:{} not equal to ack rev_id: {}", pop_rev_id, rev_id);
assert_eq!(&pop_rev_id, rev_id);
} else {
tracing::debug!("pop revision {}", pop_rev_id);
}
},
}
}
pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result<Vec<Revision>, OTError> {
let revs = range
.iter()
.flat_map(|rev_id| match self.revs_map.get(&rev_id) {
None => None,
Some(record) => Some(record.revision.clone()),
})
.collect::<Vec<Revision>>();
if revs.len() == range.len() as usize {
Ok(revs)
} else {
Ok(vec![])
}
}
pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
pub fn is_empty(&self) -> bool { self.revs_map.is_empty() }
pub fn revisions(&self) -> (Vec<i64>, Vec<RevisionRecord>) {
let mut records: Vec<RevisionRecord> = vec![];
let mut ids: Vec<i64> = vec![];
self.revs_map.iter().for_each(|kv| {
records.push(kv.value().clone());
ids.push(*kv.key());
});
(ids, records)
}
pub async fn query_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
self.revs_map.get(&rev_id).map(|r| r.value().clone())
}
pub async fn front_local_revision(&self) -> Option<(i64, RevisionRecord)> {
match self.local_revs.read().await.front() {
None => None,
Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) {
None => None,
Some(val) => {
tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id);
Some(val)
},
},
}
}
pub async fn front_local_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionMemoryCache {
pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
}

View File

@ -0,0 +1,8 @@
#![allow(clippy::module_inception)]
mod cache;
mod disk;
mod memory;
mod model;
pub use cache::*;
pub use model::*;

View File

@ -0,0 +1,15 @@
use lib_ot::revision::{RevState, Revision};
use tokio::sync::broadcast;
pub type RevIdReceiver = broadcast::Receiver<i64>;
pub type RevIdSender = broadcast::Sender<i64>;
#[derive(Clone)]
pub struct RevisionRecord {
pub revision: Revision,
pub state: RevState,
}
impl RevisionRecord {
pub fn ack(&mut self) { self.state = RevState::Acked; }
}

View File

@ -44,8 +44,13 @@ impl RevisionManager {
Ok(doc.delta()?)
}
pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> {
let _ = self.cache.add_revision(revision.clone()).await?;
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), DocError> {
let _ = self.cache.add_remote_revision(revision.clone()).await?;
Ok(())
}
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), DocError> {
let _ = self.cache.add_local_revision(revision.clone()).await?;
Ok(())
}

View File

@ -54,8 +54,13 @@ impl RevisionDownStream {
tokio::select! {
result = receiver.recv() => {
match result {
Some(msg) => yield msg,
None => {},
Some(msg) => {
yield msg
},
None => {
tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id);
break;
},
}
},
_ = stop_rx.recv() => {
@ -82,7 +87,7 @@ impl RevisionDownStream {
.await
.map_err(internal_error)?;
log::debug!("[RevisionDownStream]: receives new message: {:?}", ty);
tracing::debug!("[RevisionDownStream]: receives new message: {:?}", ty);
match ty {
WsDataType::PushRev => {
let _ = self.editor.handle_push_rev(bytes).await?;
@ -97,7 +102,6 @@ impl RevisionDownStream {
let _ = self.rev_manager.ack_revision(rev_id).await?;
},
WsDataType::Conflict => {},
WsDataType::NewDocUser => {},
}
Ok(())
@ -145,7 +149,7 @@ impl RevisionUpStream {
result = rx.recv() => {
match result {
Some(msg) => yield msg,
None => {},
None => break,
}
},
_ = stop_rx.recv() => {

View File

@ -1,10 +1,11 @@
use crate::{
errors::DocError,
sql_tables::{doc::RevTable, mk_revision_from_table, RevChangeset, RevTableState, RevTableType},
services::doc::revision::RevisionRecord,
sql_tables::{doc::RevTable, mk_revision_record_from_table, RevChangeset, RevTableState, RevTableType},
};
use diesel::update;
use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection};
use lib_ot::revision::{Revision, RevisionRange, RevisionRecord};
use lib_ot::revision::RevisionRange;
pub struct RevTableSql {}
@ -45,7 +46,7 @@ impl RevTableSql {
user_id: &str,
doc_id: &str,
conn: &SqliteConnection,
) -> Result<Vec<Revision>, DocError> {
) -> Result<Vec<RevisionRecord>, DocError> {
let filter = dsl::rev_table
.filter(dsl::doc_id.eq(doc_id))
.order(dsl::rev_id.asc())
@ -53,8 +54,8 @@ impl RevTableSql {
let rev_tables = filter.load::<RevTable>(conn)?;
let revisions = rev_tables
.into_iter()
.map(|table| mk_revision_from_table(user_id, table))
.collect::<Vec<Revision>>();
.map(|table| mk_revision_record_from_table(user_id, table))
.collect::<Vec<_>>();
Ok(revisions)
}
@ -63,7 +64,7 @@ impl RevTableSql {
doc_id: &str,
revision_id: &i64,
conn: &SqliteConnection,
) -> Result<Option<Revision>, DocError> {
) -> Result<Option<RevisionRecord>, DocError> {
let filter = dsl::rev_table
.filter(dsl::doc_id.eq(doc_id))
.filter(dsl::rev_id.eq(revision_id));
@ -72,7 +73,7 @@ impl RevTableSql {
if Err(diesel::NotFound) == result {
Ok(None)
} else {
Ok(Some(mk_revision_from_table(user_id, result?)))
Ok(Some(mk_revision_record_from_table(user_id, result?)))
}
}
@ -81,7 +82,7 @@ impl RevTableSql {
doc_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<Revision>, DocError> {
) -> Result<Vec<RevisionRecord>, DocError> {
let rev_tables = dsl::rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -91,8 +92,8 @@ impl RevTableSql {
let revisions = rev_tables
.into_iter()
.map(|table| mk_revision_from_table(user_id, table))
.collect::<Vec<Revision>>();
.map(|table| mk_revision_record_from_table(user_id, table))
.collect::<Vec<_>>();
Ok(revisions)
}

View File

@ -1,7 +1,7 @@
use crate::services::doc::revision::RevisionRecord;
use diesel::sql_types::Integer;
use flowy_database::schema::rev_table;
use flowy_collaboration::util::md5;
use flowy_database::schema::rev_table;
use lib_ot::revision::{RevId, RevState, RevType, Revision};
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
@ -64,9 +64,9 @@ impl std::convert::From<RevState> for RevTableState {
}
}
pub(crate) fn mk_revision_from_table(user_id: &str, table: RevTable) -> Revision {
pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevTable) -> RevisionRecord {
let md5 = md5(&table.data);
Revision {
let revision = Revision {
base_rev_id: table.base_rev_id,
rev_id: table.rev_id,
delta_data: table.data,
@ -74,6 +74,10 @@ pub(crate) fn mk_revision_from_table(user_id: &str, table: RevTable) -> Revision
doc_id: table.doc_id,
ty: table.ty.into(),
user_id: user_id.to_owned(),
};
RevisionRecord {
revision,
state: table.state.into(),
}
}

View File

@ -1,5 +1,5 @@
use flowy_test::editor::{EditorScript::*, *};
use lib_ot::{revision::RevState, rich_text::RichTextDeltaBuilder};
use lib_ot::revision::RevState;
#[tokio::test]
async fn doc_rev_state_test1() {
@ -40,12 +40,11 @@ async fn doc_rev_state_test2() {
#[tokio::test]
async fn doc_push_test() {
let delta = RichTextDeltaBuilder::new().insert("abc\n").build();
// let delta = RichTextDeltaBuilder::new().insert("abc\n").build();
let scripts = vec![
InsertText("1", 0),
InsertText("2", 1),
InsertText("3", 2),
SimulatePushRevisionMessageWithDelta(delta),
AssertJson(r#"[{"insert":"123\nabc\n"}]"#),
];
EditorTest::new().await.run_scripts(scripts).await;

View File

@ -231,21 +231,21 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\n\x13network_state.proto\",\n\x0cNetworkState\x12\x1c\n\x02ty\x18\x01\
\x20\x01(\x0e2\x0c.NetworkTypeR\x02ty*G\n\x0bNetworkType\x12\x16\n\x12Un\
knownNetworkType\x10\0\x12\x08\n\x04Wifi\x10\x01\x12\x08\n\x04Cell\x10\
\x02\x12\x0c\n\x08Ethernet\x10\x03J\x9d\x02\n\x06\x12\x04\0\0\t\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x01\0\x03\x01\n\n\
\n\x03\x04\0\x01\x12\x03\x01\x08\x14\n\x0b\n\x04\x04\0\x02\0\x12\x03\x02\
\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x02\x04\x0f\n\x0c\n\x05\x04\
\0\x02\0\x01\x12\x03\x02\x10\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02\
\x15\x16\n\n\n\x02\x05\0\x12\x04\x04\0\t\x01\n\n\n\x03\x05\0\x01\x12\x03\
\x04\x05\x10\n\x0b\n\x04\x05\0\x02\0\x12\x03\x05\x04\x1b\n\x0c\n\x05\x05\
\0\x02\0\x01\x12\x03\x05\x04\x16\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x05\
\x19\x1a\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x06\x04\r\n\x0c\n\x05\x05\0\
\x02\x01\x01\x12\x03\x06\x04\x08\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\
\x06\x0b\x0c\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x07\x04\r\n\x0c\n\x05\x05\
\0\x02\x02\x01\x12\x03\x07\x04\x08\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\
\x07\x0b\x0c\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x08\x04\x11\n\x0c\n\x05\
\x05\0\x02\x03\x01\x12\x03\x08\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\x12\
\x03\x08\x0f\x10b\x06proto3\
\x02\x12\x0c\n\x08Ethernet\x10\x03J\x9d\x02\n\x06\x12\x04\0\0\n\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\
\n\x03\x04\0\x01\x12\x03\x02\x08\x14\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\
\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\x0f\n\x0c\n\x05\x04\
\0\x02\0\x01\x12\x03\x03\x10\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\
\x15\x16\n\n\n\x02\x05\0\x12\x04\x05\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\
\x05\x05\x10\n\x0b\n\x04\x05\0\x02\0\x12\x03\x06\x04\x1b\n\x0c\n\x05\x05\
\0\x02\0\x01\x12\x03\x06\x04\x16\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x06\
\x19\x1a\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x07\x04\r\n\x0c\n\x05\x05\0\
\x02\x01\x01\x12\x03\x07\x04\x08\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\
\x07\x0b\x0c\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x08\x04\r\n\x0c\n\x05\x05\
\0\x02\x02\x01\x12\x03\x08\x04\x08\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\
\x08\x0b\x0c\n\x0b\n\x04\x05\0\x02\x03\x12\x03\t\x04\x11\n\x0c\n\x05\x05\
\0\x02\x03\x01\x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\t\
\x0f\x10b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -1,4 +1,5 @@
syntax = "proto3";
message NetworkState {
NetworkType ty = 1;
}

View File

@ -76,7 +76,7 @@ impl EditorTest {
self.editor.redo().await.unwrap();
},
EditorScript::AssertRevisionState(rev_id, state) => {
let record = cache.query_revision(rev_id).await.unwrap();
let record = cache.query_revision(&doc_id, rev_id).await.unwrap();
assert_eq!(record.state, state);
},
EditorScript::AssertCurrentRevId(rev_id) => {

View File

@ -7,7 +7,7 @@ use dashmap::DashMap;
use flowy_collaboration::{
core::sync::{ServerDocManager, ServerDocPersistence},
entities::{
doc::{Doc, NewDocUser},
doc::Doc,
ws::{WsDataType, WsDocumentData},
},
errors::CollaborateError,
@ -16,9 +16,13 @@ use lazy_static::lazy_static;
use lib_infra::future::{FutureResult, FutureResultSend};
use lib_ot::{revision::Revision, rich_text::RichTextDelta};
use lib_ws::{WsConnectState, WsMessage, WsMessageHandler, WsModule};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver};
use flowy_collaboration::core::sync::{RevisionUser, SyncResponse};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use tokio::sync::{broadcast, broadcast::Receiver, mpsc};
pub struct MockWebSocket {
handlers: DashMap<WsModule, Arc<dyn WsMessageHandler>>,
@ -49,12 +53,11 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
tokio::spawn(async move {
while let Ok(message) = ws_receiver.recv().await {
let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap();
match DOC_SERVER.handle_ws_data(ws_data).await {
None => {},
Some(new_ws_message) => match cloned_ws.handlers.get(&new_ws_message.module) {
let mut rx = DOC_SERVER.handle_ws_data(ws_data).await;
let new_ws_message = rx.recv().await.unwrap();
match cloned_ws.handlers.get(&new_ws_message.module) {
None => log::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()),
},
}
}
});
@ -88,38 +91,131 @@ struct MockDocServer {
impl std::default::Default for MockDocServer {
fn default() -> Self {
let manager = Arc::new(ServerDocManager::new(Arc::new(MockDocServerPersistence {})));
let persistence = Arc::new(MockDocServerPersistence::default());
let manager = Arc::new(ServerDocManager::new(persistence));
MockDocServer { manager }
}
}
impl MockDocServer {
async fn handle_ws_data(&self, ws_data: WsDocumentData) -> Option<WsMessage> {
async fn handle_ws_data(&self, ws_data: WsDocumentData) -> mpsc::Receiver<WsMessage> {
let bytes = Bytes::from(ws_data.data);
match ws_data.ty {
WsDataType::Acked => {},
WsDataType::Acked => {
unimplemented!()
},
WsDataType::PushRev => {
let revision = Revision::try_from(bytes).unwrap();
log::info!("{:?}", revision);
let handler = match self.manager.get(&revision.doc_id).await {
None => self.manager.create_doc(revision.clone()).await.unwrap(),
Some(handler) => handler,
};
let (tx, rx) = mpsc::channel(1);
let user = MockDocUser {
user_id: revision.user_id.clone(),
tx,
};
handler.apply_revision(Arc::new(user), revision).await.unwrap();
rx
},
WsDataType::PullRev => {},
WsDataType::Conflict => {},
WsDataType::NewDocUser => {
let new_doc_user = NewDocUser::try_from(bytes).unwrap();
log::info!("{:?}", new_doc_user);
// NewDocUser
WsDataType::PullRev => {
unimplemented!()
},
WsDataType::Conflict => {
unimplemented!()
},
}
None
}
}
struct MockDocServerPersistence {}
struct MockDocServerPersistence {
inner: Arc<DashMap<String, Doc>>,
}
impl std::default::Default for MockDocServerPersistence {
fn default() -> Self {
MockDocServerPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl ServerDocPersistence for MockDocServerPersistence {
fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> {
fn update_doc(&self, _doc_id: &str, _rev_id: i64, _delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> {
unimplemented!()
}
fn read_doc(&self, doc_id: &str) -> FutureResultSend<Doc, CollaborateError> { unimplemented!() }
fn read_doc(&self, doc_id: &str) -> FutureResultSend<Doc, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
FutureResultSend::new(async move {
match inner.get(&doc_id) {
None => {
//
Err(CollaborateError::record_not_found())
},
Some(val) => {
//
Ok(val.value().clone())
},
}
})
}
fn create_doc(&self, revision: Revision) -> FutureResultSend<Doc, CollaborateError> {
FutureResultSend::new(async move {
let doc: Doc = revision.try_into().unwrap();
Ok(doc)
})
}
}
#[derive(Debug)]
struct MockDocUser {
user_id: String,
tx: mpsc::Sender<WsMessage>,
}
impl RevisionUser for MockDocUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn recv(&self, resp: SyncResponse) {
let sender = self.tx.clone();
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::NewRevision {
rev_id: _,
doc_id: _,
doc_json: _,
} => {
// unimplemented!()
},
}
});
}
}

View File

@ -23,6 +23,7 @@ use tokio::{
pub trait ServerDocPersistence: Send + Sync {
fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError>;
fn read_doc(&self, doc_id: &str) -> FutureResultSend<Doc, CollaborateError>;
fn create_doc(&self, revision: Revision) -> FutureResultSend<Doc, CollaborateError>;
}
#[rustfmt::skip]
@ -59,15 +60,36 @@ impl ServerDocManager {
}
}
pub async fn get(&self, doc_id: &str) -> Result<Option<Arc<OpenDocHandle>>, CollaborateError> {
pub async fn get(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
match self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) {
Some(edit_doc) => Ok(Some(edit_doc)),
Some(edit_doc) => Some(edit_doc),
None => {
let f = || async {
let doc = self.persistence.read_doc(doc_id).await?;
let handler = self.cache(doc).await.map_err(internal_error)?;
Ok(Some(handler))
Ok::<Arc<OpenDocHandle>, CollaborateError>(handler)
};
match f().await {
Ok(handler) => Some(handler),
Err(e) => {
log::error!("{}", e);
None
},
}
},
}
}
pub async fn create_doc(&self, revision: Revision) -> Result<Arc<OpenDocHandle>, CollaborateError> {
if !revision.is_initial() {
return Err(
CollaborateError::revision_conflict().context("Revision's rev_id should be 0 when creating the doc")
);
}
let doc = self.persistence.create_doc(revision).await?;
let handler = self.cache(doc).await?;
Ok(handler)
}
async fn cache(&self, doc: Doc) -> Result<Arc<OpenDocHandle>, CollaborateError> {
@ -93,13 +115,6 @@ impl OpenDocHandle {
Ok(Self { sender })
}
pub async fn add_user(&self, user: Arc<dyn RevisionUser>, rev_id: i64) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let msg = DocCommand::NewConnectedUser { user, rev_id, ret };
let _ = self.send(msg, rx).await?;
Ok(())
}
pub async fn apply_revision(
&self,
user: Arc<dyn RevisionUser>,
@ -132,11 +147,6 @@ impl OpenDocHandle {
#[derive(Debug)]
enum DocCommand {
NewConnectedUser {
user: Arc<dyn RevisionUser>,
rev_id: i64,
ret: oneshot::Sender<CollaborateResult<()>>,
},
ReceiveRevision {
user: Arc<dyn RevisionUser>,
revision: Revision,
@ -183,10 +193,6 @@ impl DocCommandQueue {
async fn handle_message(&self, msg: DocCommand) {
match msg {
DocCommand::NewConnectedUser { user, rev_id, ret } => {
log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id);
let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await.map_err(internal_error));
},
DocCommand::ReceiveRevision { user, revision, ret } => {
// let revision = (&mut revision).try_into().map_err(internal_error).unwrap();
let _ = ret.send(
@ -247,20 +253,6 @@ impl ServerDocEditor {
})
}
#[tracing::instrument(
level = "debug",
skip(self, user),
fields(
user_id = %user.user_id(),
rev_id = %rev_id,
)
)]
pub async fn new_doc_user(&self, user: Arc<dyn RevisionUser>, rev_id: i64) -> Result<(), OTError> {
self.users.insert(user.user_id(), user.clone());
self.synchronizer.new_conn(user, rev_id);
Ok(())
}
#[tracing::instrument(
level = "debug",
skip(self, user, revision),

View File

@ -55,29 +55,12 @@ impl RevisionSynchronizer {
}
}
pub fn new_conn(&self, user: Arc<dyn RevisionUser>, rev_id: i64) {
let cur_rev_id = self.rev_id.load(SeqCst);
match cur_rev_id.cmp(&rev_id) {
Ordering::Less => {
let msg = mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id);
user.recv(SyncResponse::Pull(msg));
},
Ordering::Equal => {},
Ordering::Greater => {
let doc_delta = self.document.read().delta().clone();
let revision = self.mk_revision(rev_id, doc_delta);
let data = mk_push_message(&self.doc_id, revision);
user.recv(SyncResponse::Push(data));
},
}
}
pub fn apply_revision(&self, user: Arc<dyn RevisionUser>, revision: Revision) -> Result<(), OTError> {
let cur_rev_id = self.rev_id.load(SeqCst);
match cur_rev_id.cmp(&revision.rev_id) {
let server_base_rev_id = self.rev_id.load(SeqCst);
match server_base_rev_id.cmp(&revision.rev_id) {
Ordering::Less => {
let next_rev_id = next(cur_rev_id);
if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id {
let server_rev_id = next(server_base_rev_id);
if server_base_rev_id == revision.base_rev_id || server_rev_id == revision.rev_id {
// The rev is in the right order, just compose it.
let _ = self.compose_revision(&revision)?;
user.recv(SyncResponse::Ack(mk_acked_message(&revision)));
@ -91,13 +74,14 @@ impl RevisionSynchronizer {
});
} else {
// The server document is outdated, pull the missing revision from the client.
let msg = mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id);
let msg = mk_pull_message(&self.doc_id, server_rev_id, revision.rev_id);
user.recv(SyncResponse::Pull(msg));
}
},
Ordering::Equal => {
// Do nothing
log::warn!("Applied revision rev_id is the same as cur_rev_id");
user.recv(SyncResponse::Ack(mk_acked_message(&revision)));
},
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then

View File

@ -1,5 +1,6 @@
use crate::errors::CollaborateError;
use flowy_derive::ProtoBuf;
use lib_ot::{errors::OTError, rich_text::RichTextDelta};
use lib_ot::{errors::OTError, revision::Revision, rich_text::RichTextDelta};
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct CreateDocParams {
@ -41,6 +42,28 @@ impl Doc {
}
}
impl std::convert::TryFrom<Revision> for Doc {
type Error = CollaborateError;
fn try_from(revision: Revision) -> Result<Self, Self::Error> {
if !revision.is_initial() {
return Err(
CollaborateError::revision_conflict().context("Revision's rev_id should be 0 when creating the doc")
);
}
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let doc_json = delta.to_json();
Ok(Doc {
id: revision.doc_id,
data: doc_json,
rev_id: revision.rev_id,
base_rev_id: revision.base_rev_id,
})
}
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct UpdateDocParams {
#[pb(index = 1)]

View File

@ -1,4 +1,4 @@
use crate::{entities::doc::NewDocUser, errors::CollaborateError};
use crate::errors::CollaborateError;
use bytes::Bytes;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use lib_ot::revision::{RevId, Revision, RevisionRange};
@ -13,7 +13,6 @@ pub enum WsDataType {
// The fronted receives the PullRev event means the backend try to pull the revision from frontend
PullRev = 2,
Conflict = 3,
NewDocUser = 4,
}
impl WsDataType {
@ -53,18 +52,6 @@ impl std::convert::From<Revision> for WsDocumentData {
}
}
impl std::convert::From<NewDocUser> for WsDocumentData {
fn from(user: NewDocUser) -> Self {
let doc_id = user.doc_id.clone();
let bytes: Bytes = user.try_into().unwrap();
Self {
doc_id,
ty: WsDataType::NewDocUser,
data: bytes.to_vec(),
}
}
}
pub struct WsDocumentDataBuilder();
impl WsDocumentDataBuilder {
// WsDataType::PushRev -> Revision

View File

@ -38,6 +38,8 @@ impl CollaborateError {
static_doc_error!(undo, ErrorCode::UndoFail);
static_doc_error!(redo, ErrorCode::RedoFail);
static_doc_error!(out_of_bound, ErrorCode::OutOfBound);
static_doc_error!(record_not_found, ErrorCode::RecordNotFound);
static_doc_error!(revision_conflict, ErrorCode::RevisionConflict);
}
impl fmt::Display for CollaborateError {
@ -51,6 +53,8 @@ pub enum ErrorCode {
UndoFail = 200,
RedoFail = 201,
OutOfBound = 202,
RevisionConflict = 203,
RecordNotFound = 300,
InternalError = 1000,
}

View File

@ -261,7 +261,6 @@ pub enum WsDataType {
PushRev = 1,
PullRev = 2,
Conflict = 3,
NewDocUser = 4,
}
impl ::protobuf::ProtobufEnum for WsDataType {
@ -275,7 +274,6 @@ impl ::protobuf::ProtobufEnum for WsDataType {
1 => ::std::option::Option::Some(WsDataType::PushRev),
2 => ::std::option::Option::Some(WsDataType::PullRev),
3 => ::std::option::Option::Some(WsDataType::Conflict),
4 => ::std::option::Option::Some(WsDataType::NewDocUser),
_ => ::std::option::Option::None
}
}
@ -286,7 +284,6 @@ impl ::protobuf::ProtobufEnum for WsDataType {
WsDataType::PushRev,
WsDataType::PullRev,
WsDataType::Conflict,
WsDataType::NewDocUser,
];
values
}
@ -317,31 +314,28 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x08ws.proto\"X\n\x0eWsDocumentData\x12\x15\n\x06doc_id\x18\x01\x20\
\x01(\tR\x05docId\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\
\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*O\n\nWsDataType\
\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*?\n\nWsDataType\
\x12\t\n\x05Acked\x10\0\x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRe\
v\x10\x02\x12\x0c\n\x08Conflict\x10\x03\x12\x0e\n\nNewDocUser\x10\x04J\
\xb4\x03\n\x06\x12\x04\0\0\r\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\
\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\
\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\
\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\
\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\
\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\
\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\
\x03\x12\x03\x04\x14\x15\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\
\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\
\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\
\n\n\n\x02\x05\0\x12\x04\x07\0\r\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\
\x0f\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\
\x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\
\x0b\n\x04\x05\0\x02\x01\x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\
\x12\x03\t\x04\x0b\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\
\n\x04\x05\0\x02\x02\x12\x03\n\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\
\x03\n\x04\x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\
\x04\x05\0\x02\x03\x12\x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\
\x03\x0b\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10\n\x0b\
\n\x04\x05\0\x02\x04\x12\x03\x0c\x04\x13\n\x0c\n\x05\x05\0\x02\x04\x01\
\x12\x03\x0c\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x0c\x11\x12b\
\x06proto3\
v\x10\x02\x12\x0c\n\x08Conflict\x10\x03J\x8b\x03\n\x06\x12\x04\0\0\x0c\
\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\
\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\
\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\
\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\
\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\
\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\
\x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\
\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\
\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\
\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\
\x07\0\x0c\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\
\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\
\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\
\x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x0b\n\x0c\
\n\x05\x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\
\x03\n\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\n\x04\x0b\n\x0c\n\
\x05\x05\0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\
\x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x0b\x04\x0c\n\x0c\
\n\x05\x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -10,5 +10,4 @@ enum WsDataType {
PushRev = 1;
PullRev = 2;
Conflict = 3;
NewDocUser = 4;
}

View File

@ -1,140 +0,0 @@
use crate::{
errors::OTError,
revision::{Revision, RevisionRange},
};
use dashmap::DashMap;
use std::{collections::VecDeque, fmt::Debug, sync::Arc};
use tokio::sync::{broadcast, RwLock};
pub trait RevisionDiskCache: Sync + Send {
type Error: Debug;
fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error>;
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<Revision>, Self::Error>;
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<Revision>, Self::Error>;
fn read_revisions(&self, doc_id: &str) -> Result<Vec<Revision>, Self::Error>;
}
pub struct RevisionMemoryCache {
revs_map: Arc<DashMap<i64, RevisionRecord>>,
pending_revs: Arc<RwLock<VecDeque<i64>>>,
}
impl std::default::Default for RevisionMemoryCache {
fn default() -> Self {
let pending_revs = Arc::new(RwLock::new(VecDeque::new()));
RevisionMemoryCache {
revs_map: Arc::new(DashMap::new()),
pending_revs,
}
}
}
impl RevisionMemoryCache {
pub fn new() -> Self { RevisionMemoryCache::default() }
pub async fn add_revision(&self, revision: Revision) -> Result<(), OTError> {
// The last revision's rev_id must be greater than the new one.
if let Some(rev_id) = self.pending_revs.read().await.back() {
if *rev_id >= revision.rev_id {
return Err(OTError::revision_id_conflict()
.context(format!("The new revision's id must be greater than {}", rev_id)));
}
}
self.pending_revs.write().await.push_back(revision.rev_id);
self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision));
Ok(())
}
pub fn remove_revisions(&self, ids: Vec<i64>) { self.revs_map.retain(|k, _| !ids.contains(k)); }
pub async fn ack_revision(&self, rev_id: &i64) {
if let Some(mut m_revision) = self.revs_map.get_mut(rev_id) {
m_revision.value_mut().ack();
match self.pending_revs.write().await.pop_front() {
None => log::error!("The pending_revs should not be empty"),
Some(cache_rev_id) => {
assert_eq!(&cache_rev_id, rev_id);
},
}
} else {
log::error!("Can't find revision with id {}", rev_id);
}
}
pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result<Vec<Revision>, OTError> {
let revs = range
.iter()
.flat_map(|rev_id| match self.revs_map.get(&rev_id) {
None => None,
Some(rev) => Some(rev.revision.clone()),
})
.collect::<Vec<Revision>>();
if revs.len() == range.len() as usize {
Ok(revs)
} else {
Ok(vec![])
}
}
pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
pub fn is_empty(&self) -> bool { self.revs_map.is_empty() }
pub fn revisions(&self) -> (Vec<i64>, Vec<RevisionRecord>) {
let mut records: Vec<RevisionRecord> = vec![];
let mut ids: Vec<i64> = vec![];
self.revs_map.iter().for_each(|kv| {
records.push(kv.value().clone());
ids.push(*kv.key());
});
(ids, records)
}
pub async fn query_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
self.revs_map.get(&rev_id).map(|r| r.value().clone())
}
pub async fn front_revision(&self) -> Option<(i64, RevisionRecord)> {
match self.pending_revs.read().await.front() {
None => None,
Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
}
}
pub async fn front_rev_id(&self) -> Option<i64> { self.pending_revs.read().await.front().copied() }
}
pub type RevIdReceiver = broadcast::Receiver<i64>;
pub type RevIdSender = broadcast::Sender<i64>;
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevState {
Local = 0,
Acked = 1,
}
#[derive(Clone)]
pub struct RevisionRecord {
pub revision: Revision,
pub state: RevState,
}
impl RevisionRecord {
pub fn new(revision: Revision) -> Self {
Self {
revision,
state: RevState::Local,
}
}
pub fn ack(&mut self) { self.state = RevState::Acked; }
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionMemoryCache {
pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.pending_revs.clone() }
}

View File

@ -1,5 +1,3 @@
mod cache;
mod model;
pub use cache::*;
pub use model::*;

View File

@ -31,6 +31,8 @@ impl Revision {
pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) }
pub fn is_initial(&self) -> bool { self.rev_id == 0 }
// pub fn from_pb(pb: &mut crate::protobuf::Revision) -> Self {
// pb.try_into().unwrap() }
@ -155,3 +157,9 @@ pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
let md5 = format!("{:x}", md5::compute(data));
md5
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevState {
Local = 0,
Acked = 1,
}