rm unuse code & rename folder -> client_folder

This commit is contained in:
appflowy 2022-01-24 21:17:31 +08:00
parent f90e3b63cd
commit d97abcc99f
15 changed files with 20 additions and 257 deletions

View File

@ -135,7 +135,7 @@ fn user_scope() -> Scope {
}
pub async fn init_app_context(configuration: &Settings) -> AppContext {
let level = std::env::var("RUST_LOG").unwrap_or("info".to_owned());
let level = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned());
let _ = crate::services::log::Builder::new("flowy-server")
.env_filter(&level)
.build();

View File

@ -15,7 +15,7 @@ CARGO_MAKE_EXTEND_WORKSPACE_MAKEFILE = true
CARGO_MAKE_CRATE_FS_NAME = "dart_ffi"
CARGO_MAKE_CRATE_NAME = "dart-ffi"
VERSION = "0.0.2"
FEATURES = "flutter"
FEATURES = "flutter,http_server"
PRODUCT_NAME = "AppFlowy"
#CRATE_TYPE: https://doc.rust-lang.org/reference/linkage.html
CRATE_TYPE = "staticlib"

View File

@ -5,7 +5,7 @@ use flowy_core_data_model::user_default;
use flowy_sync::RevisionWebSocket;
use lazy_static::lazy_static;
use flowy_collaboration::{entities::ws_data::ServerRevisionWSData, folder::FolderPad};
use flowy_collaboration::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
use flowy_document::FlowyDocumentManager;
use std::{collections::HashMap, convert::TryInto, fmt::Formatter, sync::Arc};

View File

@ -1,7 +1,7 @@
use crate::services::web_socket::make_folder_ws_manager;
use flowy_collaboration::{
client_folder::{FolderChange, FolderPad},
entities::{revision::Revision, ws_data::ServerRevisionWSData},
folder::{FolderChange, FolderPad},
};
use crate::controller::FolderId;

View File

@ -2,7 +2,7 @@ use crate::{
module::WorkspaceDatabase,
services::persistence::{AppTableSql, TrashTableSql, ViewTableSql, WorkspaceTableSql},
};
use flowy_collaboration::{entities::revision::md5, folder::FolderPad};
use flowy_collaboration::{client_folder::FolderPad, entities::revision::md5};
use flowy_core_data_model::entities::{
app::{App, RepeatedApp},
view::{RepeatedView, View},

View File

@ -3,8 +3,8 @@ pub mod version_1;
mod version_2;
use flowy_collaboration::{
client_folder::FolderPad,
entities::revision::{Revision, RevisionState},
folder::FolderPad,
};
use std::sync::Arc;
use tokio::sync::RwLock;

View File

@ -1,11 +1,11 @@
use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS;
use bytes::Bytes;
use flowy_collaboration::{
client_folder::FolderPad,
entities::{
revision::RevisionRange,
ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
},
folder::FolderPad,
};
use flowy_error::FlowyError;
use flowy_sync::*;

View File

@ -200,17 +200,19 @@ impl RevisionWSStream {
async fn handle_message(&self, msg: ServerRevisionWSData) -> FlowyResult<()> {
let ServerRevisionWSData { object_id, ty, data } = msg;
let bytes = Bytes::from(data);
tracing::trace!("[{}]: new message: {}:{:?}", self, object_id, ty);
match ty {
ServerRevisionWSDataType::ServerPushRev => {
tracing::trace!("[{}]: new push revision: {}:{:?}", self, object_id, ty);
let _ = self.consumer.receive_push_revision(bytes).await?;
}
ServerRevisionWSDataType::ServerPullRev => {
let range = RevisionRange::try_from(bytes)?;
tracing::trace!("[{}]: new pull: {}:{}-{:?}", self, object_id, range, ty);
let _ = self.consumer.pull_revisions_in_range(range).await?;
}
ServerRevisionWSDataType::ServerAck => {
let rev_id = RevId::try_from(bytes).unwrap().value;
tracing::trace!("[{}]: new ack: {}:{}-{:?}", self, object_id, rev_id, ty);
let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await;
}
ServerRevisionWSDataType::UserConnect => {

View File

@ -27,5 +27,3 @@ parking_lot = "0.11"
dashmap = "4.0"
futures = "0.3.15"
async-stream = "0.3.2"
[dev-dependencies]

View File

@ -1,7 +1,7 @@
use crate::{
client_folder::{default_folder_delta, FolderPad},
entities::revision::Revision,
errors::{CollaborateError, CollaborateResult},
folder::{default_folder_delta, FolderPad},
};
use flowy_core_data_model::entities::{trash::Trash, workspace::Workspace};
use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder};

View File

@ -1,10 +1,10 @@
use crate::{
client_folder::builder::FolderPadBuilder,
entities::{
folder_info::FolderDelta,
revision::{md5, Revision},
},
errors::{CollaborateError, CollaborateResult},
folder::builder::FolderPadBuilder,
};
use dissimilar::*;
use flowy_core_data_model::entities::{app::App, trash::Trash, view::View, workspace::Workspace};
@ -401,7 +401,7 @@ fn cal_diff(old: String, new: String) -> Delta<PlainTextAttributes> {
#[cfg(test)]
mod tests {
#![allow(clippy::all)]
use crate::{entities::folder_info::FolderDelta, folder::folder_pad::FolderPad};
use crate::{client_folder::folder_pad::FolderPad, entities::folder_info::FolderDelta};
use chrono::Utc;
use flowy_core_data_model::entities::{app::App, trash::Trash, view::View, workspace::Workspace};
use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder};

View File

@ -182,6 +182,12 @@ pub struct RevisionRange {
pub end: i64,
}
impl std::fmt::Display for RevisionRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{},{}]", self.start, self.end))
}
}
impl RevisionRange {
pub fn len(&self) -> i64 {
debug_assert!(self.end >= self.start);

View File

@ -1,7 +1,7 @@
pub mod client_document;
pub mod client_folder;
pub mod entities;
pub mod errors;
pub mod folder;
pub mod protobuf;
pub mod server_document;
pub mod server_folder;

View File

@ -1,243 +0,0 @@
use crate::{
document::Document,
entities::{
revision::RevisionRange,
ws::{DocumentServerWSData, DocumentServerWSDataBuilder},
},
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::DocumentPersistence,
util::*,
};
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use parking_lot::RwLock;
use std::{
cmp::Ordering,
fmt::Debug,
sync::{
atomic::{AtomicI64, Ordering::SeqCst},
Arc,
},
time::Duration,
};
pub trait RevisionUser: Send + Sync + Debug {
fn user_id(&self) -> String;
fn receive(&self, resp: SyncResponse);
}
pub enum SyncResponse {
Pull(DocumentServerWSData),
Push(DocumentServerWSData),
Ack(DocumentServerWSData),
NewRevision(RepeatedRevisionPB),
}
pub struct RevisionSynchronizer {
pub doc_id: String,
pub rev_id: AtomicI64,
document: Arc<RwLock<Document>>,
}
impl RevisionSynchronizer {
pub fn new(doc_id: &str, rev_id: i64, document: Document) -> RevisionSynchronizer {
let document = Arc::new(RwLock::new(document));
RevisionSynchronizer {
doc_id: doc_id.to_string(),
rev_id: AtomicI64::new(rev_id),
document,
}
}
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision, persistence), err)]
pub async fn sync_revisions(
&self,
user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn DocumentPersistence>,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
if repeated_revision.get_items().is_empty() {
// Return all the revisions to client
let revisions = persistence.get_doc_revisions(&doc_id).await?;
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?;
let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, repeated_revision);
user.receive(SyncResponse::Push(data));
return Ok(());
}
let server_base_rev_id = self.rev_id.load(SeqCst);
let first_revision = repeated_revision.get_items().first().unwrap().clone();
if self.is_applied_before(&first_revision, &persistence).await {
// Server has received this revision before, so ignore the following revisions
return Ok(());
}
match server_base_rev_id.cmp(&first_revision.rev_id) {
Ordering::Less => {
let server_rev_id = next(server_base_rev_id);
if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id {
// The rev is in the right order, just compose it.
for revision in repeated_revision.get_items() {
let _ = self.compose_revision(revision)?;
}
user.receive(SyncResponse::NewRevision(repeated_revision));
} else {
// The server document is outdated, pull the missing revision from the client.
let range = RevisionRange {
doc_id: self.doc_id.clone(),
start: server_rev_id,
end: first_revision.rev_id,
};
let msg = DocumentServerWSDataBuilder::build_pull_message(&self.doc_id, range);
user.receive(SyncResponse::Pull(msg));
}
}
Ordering::Equal => {
// Do nothing
log::warn!("Applied revision rev_id is the same as cur_rev_id");
}
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = first_revision.rev_id;
let to_rev_id = server_base_rev_id;
let _ = self
.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id)
.await;
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, user, persistence), fields(server_rev_id), err)]
pub async fn pong(
&self,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
client_rev_id: i64,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let server_rev_id = self.rev_id();
tracing::Span::current().record("server_rev_id", &server_rev_id);
match server_rev_id.cmp(&client_rev_id) {
Ordering::Less => {
tracing::error!("Client should not send ping and the server should pull the revisions from the client")
}
Ordering::Equal => tracing::trace!("{} is up to date.", doc_id),
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = client_rev_id;
let to_rev_id = server_rev_id;
tracing::trace!("Push revisions to user");
let _ = self
.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id)
.await;
}
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)]
pub async fn reset(
&self,
persistence: Arc<dyn DocumentPersistence>,
repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
tracing::Span::current().record("doc_id", &doc_id.as_str());
let revisions: Vec<RevisionPB> = repeated_revision.get_items().to_vec();
let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions);
let delta = make_delta_from_revision_pb(revisions)?;
let _ = persistence.reset_document(&doc_id, repeated_revision).await?;
*self.document.write() = Document::from_delta(delta);
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(())
}
pub fn doc_json(&self) -> String {
self.document.read().to_json()
}
fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> {
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let _ = self.compose_delta(delta)?;
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
fn transform_revision(&self, revision: &RevisionPB) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> {
let cli_delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let result = self.document.read().delta().transform(&cli_delta)?;
Ok(result)
}
fn compose_delta(&self, delta: RichTextDelta) -> Result<(), CollaborateError> {
if delta.is_empty() {
log::warn!("Composed delta is empty");
}
match self.document.try_write_for(Duration::from_millis(300)) {
None => log::error!("Failed to acquire write lock of document"),
Some(mut write_guard) => {
let _ = write_guard.compose_delta(delta);
}
}
Ok(())
}
pub(crate) fn rev_id(&self) -> i64 {
self.rev_id.load(SeqCst)
}
async fn is_applied_before(&self, new_revision: &RevisionPB, persistence: &Arc<dyn DocumentPersistence>) -> bool {
if let Ok(revisions) = persistence.get_revisions(&self.doc_id, vec![new_revision.rev_id]).await {
if let Some(revision) = revisions.first() {
if revision.md5 == new_revision.md5 {
return true;
}
}
};
false
}
async fn push_revisions_to_user(
&self,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
from: i64,
to: i64,
) {
let rev_ids: Vec<i64> = (from..=to).collect();
let revisions = match persistence.get_revisions(&self.doc_id, rev_ids).await {
Ok(revisions) => {
debug_assert!(!revisions.is_empty(), "revisions should not be empty if the doc exists");
revisions
}
Err(e) => {
tracing::error!("{}", e);
vec![]
}
};
tracing::debug!("Push revision: {} -> {} to client", from, to);
match repeated_revision_from_revision_pbs(revisions) {
Ok(repeated_revision) => {
let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, repeated_revision);
user.receive(SyncResponse::Push(data));
}
Err(e) => tracing::error!("{}", e),
}
}
}
#[inline]
fn next(rev_id: i64) -> i64 {
rev_id + 1
}