appflowy 2022-01-01 16:53:12 +08:00
parent a654623c12
commit df5266d7c9
9 changed files with 41 additions and 24 deletions

View File

@ -10,7 +10,7 @@ use backend_service::errors::{invalid_params, ServerError};
use bytes::Bytes; use bytes::Bytes;
use chrono::Utc; use chrono::Utc;
use flowy_collaboration::{ use flowy_collaboration::{
entities::revision::{RepeatedRevision, RevType, Revision}, entities::revision::{RepeatedRevision, Revision},
protobuf::CreateDocParams, protobuf::CreateDocParams,
}; };
use flowy_core_data_model::{ use flowy_core_data_model::{

View File

@ -5,7 +5,7 @@ use flowy_collaboration::{
document::{Document, PlainDoc}, document::{Document, PlainDoc},
entities::{ entities::{
doc::{CreateDocParams, DocumentId}, doc::{CreateDocParams, DocumentId},
revision::{md5, RepeatedRevision, RevType, Revision}, revision::{md5, RepeatedRevision, Revision},
}, },
}; };
use flowy_core_data_model::entities::{ use flowy_core_data_model::entities::{

View File

@ -3,6 +3,7 @@ import 'package:dartz/dartz.dart';
import 'package:flowy_log/flowy_log.dart'; import 'package:flowy_log/flowy_log.dart';
// ignore: unnecessary_import // ignore: unnecessary_import
import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart'; import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart';
import 'package:flowy_sdk/protobuf/dart-ffi/ffi_request.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-net/event.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-net/event.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart';

View File

@ -9,7 +9,7 @@ use crate::{
use bytes::Bytes; use bytes::Bytes;
use flowy_collaboration::{ use flowy_collaboration::{
document::history::UndoResult, document::history::UndoResult,
entities::revision::{RevId, RevType, Revision}, entities::revision::{RevId, Revision},
errors::CollaborateResult, errors::CollaborateResult,
}; };
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;

View File

@ -6,20 +6,16 @@ use crate::{
}, },
sql_tables::{RevisionChangeset, RevisionTableState}, sql_tables::{RevisionChangeset, RevisionTableState},
}; };
use dashmap::DashMap;
use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState}; use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_error::{internal_error, FlowyResult}; use flowy_error::{internal_error, FlowyResult};
use lib_infra::future::FutureResult;
use lib_ot::errors::OTError; use std::sync::{
use std::{ atomic::{AtomicI64, Ordering::SeqCst},
collections::VecDeque, Arc,
sync::{
atomic::{AtomicI64, Ordering::SeqCst},
Arc,
},
}; };
use tokio::{sync::RwLock, task::spawn_blocking}; use tokio::task::spawn_blocking;
pub struct RevisionCache { pub struct RevisionCache {
doc_id: String, doc_id: String,
@ -41,12 +37,21 @@ impl RevisionCache {
} }
} }
pub async fn add(&self, revision: Revision, state: RevisionState) -> FlowyResult<RevisionRecord> { pub async fn add(
&self,
revision: Revision,
state: RevisionState,
write_to_disk: bool,
) -> FlowyResult<RevisionRecord> {
if self.memory_cache.contains(&revision.rev_id) { if self.memory_cache.contains(&revision.rev_id) {
return Err(FlowyError::internal().context(format!("Duplicate remote revision id: {}", revision.rev_id))); return Err(FlowyError::internal().context(format!("Duplicate remote revision id: {}", revision.rev_id)));
} }
let rev_id = revision.rev_id; let rev_id = revision.rev_id;
let record = RevisionRecord { revision, state }; let record = RevisionRecord {
revision,
state,
write_to_disk,
};
self.memory_cache.add(&record).await; self.memory_cache.add(&record).await;
self.set_latest_rev_id(rev_id); self.set_latest_rev_id(rev_id);
Ok(record) Ok(record)
@ -111,6 +116,7 @@ impl RevisionCache {
.map(|revision| RevisionRecord { .map(|revision| RevisionRecord {
revision, revision,
state: RevisionState::Local, state: RevisionState::Local,
write_to_disk: true,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -128,9 +134,13 @@ impl RevisionCache {
} }
impl RevisionMemoryCacheDelegate for Arc<Persistence> { impl RevisionMemoryCacheDelegate for Arc<Persistence> {
fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()> { fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?; let conn = &*self.pool.get().map_err(internal_error)?;
self.write_revision_records(records, &conn) records.retain(|record| record.write_to_disk);
if !records.is_empty() {
let _ = self.write_revision_records(records, &conn)?;
}
Ok(())
} }
fn receive_ack(&self, doc_id: &str, rev_id: i64) { fn receive_ack(&self, doc_id: &str, rev_id: i64) {
@ -150,6 +160,7 @@ impl RevisionMemoryCacheDelegate for Arc<Persistence> {
pub struct RevisionRecord { pub struct RevisionRecord {
pub revision: Revision, pub revision: Revision,
pub state: RevisionState, pub state: RevisionState,
pub write_to_disk: bool,
} }
impl RevisionRecord { impl RevisionRecord {

View File

@ -7,7 +7,7 @@ use dashmap::DashMap;
use flowy_collaboration::{ use flowy_collaboration::{
entities::{ entities::{
doc::DocumentInfo, doc::DocumentInfo,
revision::{RepeatedRevision, RevType, Revision, RevisionRange, RevisionState}, revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
}, },
util::{md5, RevIdCounter}, util::{md5, RevIdCounter},
}; };
@ -68,13 +68,13 @@ impl RevisionManager {
#[tracing::instrument(level = "debug", skip(self, revision))] #[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> { pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
self.rev_id_counter.set(revision.rev_id); self.rev_id_counter.set(revision.rev_id);
let _ = self.cache.add(revision.clone(), RevisionState::Ack).await?; let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip(self, revision))] #[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> { pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
let record = self.cache.add(revision.clone(), RevisionState::Local).await?; let record = self.cache.add(revision.clone(), RevisionState::Local, true).await?;
self.sync_seq.add_revision(record).await?; self.sync_seq.add_revision(record).await?;
Ok(()) Ok(())
} }
@ -206,14 +206,18 @@ impl RevisionLoader {
&self.user_id, &self.user_id,
doc_md5, doc_md5,
); );
let _ = self.cache.add(revision.clone(), RevisionState::Ack).await?; let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
revisions = vec![revision]; revisions = vec![revision];
} else { } else {
for record in &records { for record in &records {
match record.state { match record.state {
RevisionState::Local => { RevisionState::Local => {
// //
match self.cache.add(record.revision.clone(), RevisionState::Local).await { match self
.cache
.add(record.revision.clone(), RevisionState::Local, false)
.await
{
Ok(_) => {}, Ok(_) => {},
Err(e) => tracing::error!("{}", e), Err(e) => tracing::error!("{}", e),
} }

View File

@ -10,7 +10,7 @@ use crate::services::doc::{
use bytes::Bytes; use bytes::Bytes;
use flowy_collaboration::{ use flowy_collaboration::{
entities::{ entities::{
revision::{RepeatedRevision, RevType, Revision, RevisionRange}, revision::{RepeatedRevision, Revision, RevisionRange},
ws::{DocumentClientWSData, NewDocumentUser}, ws::{DocumentClientWSData, NewDocumentUser},
}, },
errors::CollaborateResult, errors::CollaborateResult,

View File

@ -80,6 +80,7 @@ pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevisionTable)
RevisionRecord { RevisionRecord {
revision, revision,
state: table.state.into(), state: table.state.into(),
write_to_disk: false,
} }
} }

View File

@ -98,7 +98,7 @@ impl CreateViewParams {
} }
} }
pub fn take_view_data(&mut self) -> String { ::std::mem::replace(&mut self.view_data, String::new()) } pub fn take_view_data(&mut self) -> String { std::mem::take(&mut self.view_data) }
} }
impl TryInto<CreateViewParams> for CreateViewRequest { impl TryInto<CreateViewParams> for CreateViewRequest {