2021-10-01 11:39:08 +00:00
|
|
|
use crate::{
|
|
|
|
entities::doc::{RevType, Revision, RevisionRange},
|
|
|
|
errors::DocError,
|
|
|
|
services::{
|
2021-10-02 09:19:54 +00:00
|
|
|
doc::revision::store::{RevisionStore, StoreCmd},
|
2021-10-01 11:39:08 +00:00
|
|
|
util::RevIdCounter,
|
|
|
|
ws::WsDocumentSender,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
2021-10-02 13:35:06 +00:00
|
|
|
use crate::{entities::doc::RevId, errors::DocResult};
|
2021-10-01 11:39:08 +00:00
|
|
|
use flowy_database::ConnectionPool;
|
2021-10-02 09:19:54 +00:00
|
|
|
use flowy_infra::future::ResultFuture;
|
|
|
|
use flowy_ot::core::Delta;
|
2021-10-01 11:39:08 +00:00
|
|
|
use parking_lot::RwLock;
|
|
|
|
use std::{collections::VecDeque, sync::Arc};
|
2021-10-02 13:35:06 +00:00
|
|
|
use tokio::sync::{mpsc, oneshot};
|
2021-10-02 09:19:54 +00:00
|
|
|
|
|
|
|
pub struct DocRevision {
|
2021-10-02 13:35:06 +00:00
|
|
|
pub rev_id: RevId,
|
2021-10-02 09:19:54 +00:00
|
|
|
pub delta: Delta,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub trait RevisionServer: Send + Sync {
|
|
|
|
fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<DocRevision, DocError>;
|
|
|
|
}
|
2021-10-01 11:39:08 +00:00
|
|
|
|
|
|
|
pub struct RevisionManager {
|
|
|
|
doc_id: String,
|
|
|
|
rev_id_counter: RevIdCounter,
|
2021-10-02 09:19:54 +00:00
|
|
|
ws: Arc<dyn WsDocumentSender>,
|
|
|
|
store: mpsc::Sender<StoreCmd>,
|
2021-10-01 11:39:08 +00:00
|
|
|
pending_revs: RwLock<VecDeque<Revision>>,
|
|
|
|
}
|
2021-10-02 09:19:54 +00:00
|
|
|
|
2021-10-01 11:39:08 +00:00
|
|
|
impl RevisionManager {
|
2021-10-02 09:19:54 +00:00
|
|
|
pub async fn new(
|
|
|
|
doc_id: &str,
|
|
|
|
pool: Arc<ConnectionPool>,
|
2021-10-02 13:35:06 +00:00
|
|
|
ws: Arc<dyn WsDocumentSender>,
|
2021-10-02 09:19:54 +00:00
|
|
|
server: Arc<dyn RevisionServer>,
|
|
|
|
) -> DocResult<(Self, Delta)> {
|
|
|
|
let (sender, receiver) = mpsc::channel::<StoreCmd>(50);
|
|
|
|
let store = RevisionStore::new(doc_id, pool, receiver, server);
|
2021-10-01 12:58:13 +00:00
|
|
|
tokio::spawn(store.run());
|
2021-10-01 11:39:08 +00:00
|
|
|
|
2021-10-02 09:19:54 +00:00
|
|
|
let DocRevision { rev_id, delta } = fetch_document(sender.clone()).await?;
|
2021-10-02 13:35:06 +00:00
|
|
|
log::info!("😁Document delta: {:?}", delta);
|
2021-10-02 09:19:54 +00:00
|
|
|
|
2021-10-01 11:39:08 +00:00
|
|
|
let doc_id = doc_id.to_string();
|
2021-10-02 13:35:06 +00:00
|
|
|
let rev_id_counter = RevIdCounter::new(rev_id.into());
|
2021-10-01 11:39:08 +00:00
|
|
|
let pending_revs = RwLock::new(VecDeque::new());
|
2021-10-02 09:19:54 +00:00
|
|
|
let manager = Self {
|
2021-10-01 11:39:08 +00:00
|
|
|
doc_id,
|
|
|
|
rev_id_counter,
|
2021-10-02 13:35:06 +00:00
|
|
|
ws,
|
2021-10-01 11:39:08 +00:00
|
|
|
pending_revs,
|
2021-10-02 09:19:54 +00:00
|
|
|
store: sender,
|
|
|
|
};
|
|
|
|
Ok((manager, delta))
|
2021-10-01 11:39:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); }
|
|
|
|
|
|
|
|
pub fn next_compose_revision(&self) -> Option<Revision> { self.pending_revs.write().pop_front() }
|
|
|
|
|
2021-10-02 09:19:54 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2021-10-01 11:39:08 +00:00
|
|
|
pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
|
2021-10-02 09:19:54 +00:00
|
|
|
let cmd = StoreCmd::Revision {
|
2021-10-01 11:39:08 +00:00
|
|
|
revision: revision.clone(),
|
|
|
|
};
|
2021-10-02 09:19:54 +00:00
|
|
|
let _ = self.store.send(cmd).await;
|
2021-10-01 11:39:08 +00:00
|
|
|
|
|
|
|
match revision.ty {
|
2021-10-02 09:19:54 +00:00
|
|
|
RevType::Local => match self.ws.send(revision.into()) {
|
2021-10-01 11:39:08 +00:00
|
|
|
Ok(_) => {},
|
|
|
|
Err(e) => log::error!("Send delta failed: {:?}", e),
|
|
|
|
},
|
|
|
|
RevType::Remote => {
|
|
|
|
self.pending_revs.write().push_back(revision);
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-10-02 13:35:06 +00:00
|
|
|
pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> {
|
2021-10-02 09:19:54 +00:00
|
|
|
let sender = self.store.clone();
|
2021-10-01 11:39:08 +00:00
|
|
|
tokio::spawn(async move {
|
2021-10-02 09:19:54 +00:00
|
|
|
let _ = sender.send(StoreCmd::AckRevision { rev_id }).await;
|
2021-10-01 11:39:08 +00:00
|
|
|
});
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
|
|
|
|
|
|
|
|
pub fn next_rev_id(&self) -> (i64, i64) {
|
|
|
|
let cur = self.rev_id_counter.value();
|
|
|
|
let next = self.rev_id_counter.next();
|
|
|
|
(cur, next)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> {
|
|
|
|
debug_assert!(&range.doc_id == &self.doc_id);
|
|
|
|
let (ret, _rx) = oneshot::channel();
|
2021-10-02 09:19:54 +00:00
|
|
|
let sender = self.store.clone();
|
2021-10-01 11:39:08 +00:00
|
|
|
|
|
|
|
tokio::spawn(async move {
|
2021-10-02 09:19:54 +00:00
|
|
|
let _ = sender.send(StoreCmd::SendRevisions { range, ret }).await;
|
2021-10-01 11:39:08 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
unimplemented!()
|
|
|
|
}
|
|
|
|
}
|
2021-10-02 09:19:54 +00:00
|
|
|
|
|
|
|
async fn fetch_document(sender: mpsc::Sender<StoreCmd>) -> DocResult<DocRevision> {
|
|
|
|
let (ret, rx) = oneshot::channel();
|
|
|
|
let _ = sender.send(StoreCmd::DocumentDelta { ret }).await;
|
|
|
|
|
|
|
|
match rx.await {
|
|
|
|
Ok(result) => Ok(result?),
|
|
|
|
Err(e) => {
|
|
|
|
log::error!("fetch_document: {}", e);
|
|
|
|
Err(DocError::internal().context(format!("fetch_document: {}", e)))
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|