mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
refactor: deps crates (#4362)
* refactor: rename flowy-folder-deps to flowy-folder-pub * chore: rename crates * chore: move flowy-task to lib-infra * chore: rename crates * refactor: user manager dir
This commit is contained in:
@ -19,6 +19,12 @@ anyhow.workspace = true
|
||||
walkdir = "2.4.0"
|
||||
tempfile = "3.8.1"
|
||||
validator = "0.16.0"
|
||||
tracing.workspace = true
|
||||
atomic_refcell = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.8.5"
|
||||
futures = "0.3.30"
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
zip = { version = "0.6.6", features = ["deflate"] }
|
||||
|
@ -11,3 +11,5 @@ pub mod future;
|
||||
pub mod ref_map;
|
||||
pub mod util;
|
||||
pub mod validator_fn;
|
||||
|
||||
pub mod priority_task;
|
||||
|
8
frontend/rust-lib/lib-infra/src/priority_task/mod.rs
Normal file
8
frontend/rust-lib/lib-infra/src/priority_task/mod.rs
Normal file
@ -0,0 +1,8 @@
|
||||
mod queue;
|
||||
mod scheduler;
|
||||
mod store;
|
||||
mod task;
|
||||
|
||||
pub use queue::TaskHandlerId;
|
||||
pub use scheduler::*;
|
||||
pub use task::*;
|
130
frontend/rust-lib/lib-infra/src/priority_task/queue.rs
Normal file
130
frontend/rust-lib/lib-infra/src/priority_task/queue.rs
Normal file
@ -0,0 +1,130 @@
|
||||
use crate::priority_task::{PendingTask, Task};
|
||||
use atomic_refcell::AtomicRefCell;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TaskQueue {
|
||||
// index_tasks for quick access
|
||||
index_tasks: HashMap<TaskHandlerId, Arc<AtomicRefCell<TaskList>>>,
|
||||
queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
|
||||
}
|
||||
|
||||
impl TaskQueue {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub(crate) fn push(&mut self, task: &Task) {
|
||||
if task.content.is_none() {
|
||||
tracing::warn!(
|
||||
"The task:{} with empty content will be not executed",
|
||||
task.id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let pending_task = PendingTask {
|
||||
qos: task.qos,
|
||||
id: task.id,
|
||||
};
|
||||
match self.index_tasks.entry(task.handler_id.clone()) {
|
||||
Entry::Occupied(entry) => {
|
||||
let mut list = entry.get().borrow_mut();
|
||||
assert!(list
|
||||
.peek()
|
||||
.map(|old_id| pending_task.id >= old_id.id)
|
||||
.unwrap_or(true));
|
||||
list.push(pending_task);
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
let mut task_list = TaskList::new(entry.key());
|
||||
task_list.push(pending_task);
|
||||
let task_list = Arc::new(AtomicRefCell::new(task_list));
|
||||
entry.insert(task_list.clone());
|
||||
self.queue.push(task_list);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.queue.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
|
||||
where
|
||||
F: FnMut(&mut TaskList) -> Option<T>,
|
||||
{
|
||||
let head = self.queue.pop()?;
|
||||
let result = {
|
||||
let mut ref_head = head.borrow_mut();
|
||||
f(&mut ref_head)
|
||||
};
|
||||
if !head.borrow().tasks.is_empty() {
|
||||
self.queue.push(head);
|
||||
} else {
|
||||
self.index_tasks.remove(&head.borrow().id);
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
pub type TaskHandlerId = String;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TaskList {
|
||||
pub(crate) id: TaskHandlerId,
|
||||
tasks: BinaryHeap<PendingTask>,
|
||||
}
|
||||
|
||||
impl Deref for TaskList {
|
||||
type Target = BinaryHeap<PendingTask>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.tasks
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for TaskList {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.tasks
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskList {
|
||||
fn new(id: &str) -> Self {
|
||||
Self {
|
||||
id: id.to_owned(),
|
||||
tasks: BinaryHeap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for TaskList {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.id == other.id
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for TaskList {}
|
||||
|
||||
impl Ord for TaskList {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self.peek(), other.peek()) {
|
||||
(None, None) => Ordering::Equal,
|
||||
(None, Some(_)) => Ordering::Less,
|
||||
(Some(_), None) => Ordering::Greater,
|
||||
(Some(lhs), Some(rhs)) => lhs.cmp(rhs),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for TaskList {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
205
frontend/rust-lib/lib-infra/src/priority_task/scheduler.rs
Normal file
205
frontend/rust-lib/lib-infra/src/priority_task/scheduler.rs
Normal file
@ -0,0 +1,205 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::future::BoxResultFuture;
|
||||
use crate::priority_task::queue::TaskQueue;
|
||||
use crate::priority_task::store::TaskStore;
|
||||
use crate::priority_task::{Task, TaskContent, TaskId, TaskState};
|
||||
use anyhow::Error;
|
||||
use tokio::sync::{watch, RwLock};
|
||||
use tokio::time::interval;
|
||||
|
||||
pub struct TaskDispatcher {
|
||||
queue: TaskQueue,
|
||||
store: TaskStore,
|
||||
timeout: Duration,
|
||||
handlers: HashMap<String, Arc<dyn TaskHandler>>,
|
||||
|
||||
notifier: watch::Sender<bool>,
|
||||
pub(crate) notifier_rx: Option<watch::Receiver<bool>>,
|
||||
}
|
||||
|
||||
impl TaskDispatcher {
|
||||
pub fn new(timeout: Duration) -> Self {
|
||||
let (notifier, notifier_rx) = watch::channel(false);
|
||||
Self {
|
||||
queue: TaskQueue::new(),
|
||||
store: TaskStore::new(),
|
||||
timeout,
|
||||
handlers: HashMap::new(),
|
||||
notifier,
|
||||
notifier_rx: Some(notifier_rx),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_handler<T>(&mut self, handler: T)
|
||||
where
|
||||
T: TaskHandler,
|
||||
{
|
||||
let handler_id = handler.handler_id().to_owned();
|
||||
self.handlers.insert(handler_id, Arc::new(handler));
|
||||
}
|
||||
|
||||
pub async fn unregister_handler<T: AsRef<str>>(&mut self, handler_id: T) {
|
||||
if let Some(handler) = self.handlers.remove(handler_id.as_ref()) {
|
||||
tracing::trace!(
|
||||
"{}:{} is unregistered",
|
||||
handler.handler_name(),
|
||||
handler.handler_id()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&mut self) {
|
||||
let _ = self.notifier.send(true);
|
||||
self.queue.clear();
|
||||
self.store.clear();
|
||||
}
|
||||
|
||||
pub(crate) async fn process_next_task(&mut self) -> Option<()> {
|
||||
let pending_task = self.queue.mut_head(|list| list.pop())?;
|
||||
let mut task = self.store.remove_task(&pending_task.id)?;
|
||||
let ret = task.ret.take()?;
|
||||
|
||||
// Do not execute the task if the task was cancelled.
|
||||
if task.state().is_cancel() {
|
||||
let _ = ret.send(task.into());
|
||||
self.notify();
|
||||
return None;
|
||||
}
|
||||
|
||||
let content = task.content.take()?;
|
||||
if let Some(handler) = self.handlers.get(&task.handler_id) {
|
||||
task.set_state(TaskState::Processing);
|
||||
tracing::trace!("{} task is running", handler.handler_name(),);
|
||||
match tokio::time::timeout(self.timeout, handler.run(content)).await {
|
||||
Ok(result) => match result {
|
||||
Ok(_) => {
|
||||
tracing::trace!("{} task is done", handler.handler_name(),);
|
||||
task.set_state(TaskState::Done)
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("{} task is failed: {:?}", handler.handler_name(), e);
|
||||
task.set_state(TaskState::Failure);
|
||||
},
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("{} task is timeout: {:?}", handler.handler_name(), e);
|
||||
task.set_state(TaskState::Timeout);
|
||||
},
|
||||
}
|
||||
} else {
|
||||
tracing::trace!("{} is cancel", task.handler_id);
|
||||
task.set_state(TaskState::Cancel);
|
||||
}
|
||||
let _ = ret.send(task.into());
|
||||
self.notify();
|
||||
None
|
||||
}
|
||||
|
||||
pub fn add_task(&mut self, task: Task) {
|
||||
debug_assert!(!task.state().is_done());
|
||||
if task.state().is_done() {
|
||||
tracing::warn!("Should not add a task which state is done");
|
||||
return;
|
||||
}
|
||||
|
||||
self.queue.push(&task);
|
||||
self.store.insert_task(task);
|
||||
self.notify();
|
||||
}
|
||||
|
||||
pub fn read_task(&self, task_id: &TaskId) -> Option<&Task> {
|
||||
self.store.read_task(task_id)
|
||||
}
|
||||
|
||||
pub fn cancel_task(&mut self, task_id: TaskId) {
|
||||
if let Some(task) = self.store.mut_task(&task_id) {
|
||||
task.set_state(TaskState::Cancel);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear_task(&mut self) {
|
||||
self.store.clear();
|
||||
}
|
||||
pub fn next_task_id(&self) -> TaskId {
|
||||
self.store.next_task_id()
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self) {
|
||||
let _ = self.notifier.send(false);
|
||||
}
|
||||
}
|
||||
pub struct TaskRunner();
|
||||
impl TaskRunner {
|
||||
pub async fn run(dispatcher: Arc<RwLock<TaskDispatcher>>) {
|
||||
dispatcher.read().await.notify();
|
||||
let debounce_duration = Duration::from_millis(300);
|
||||
let mut notifier = dispatcher
|
||||
.write()
|
||||
.await
|
||||
.notifier_rx
|
||||
.take()
|
||||
.expect("Only take once");
|
||||
loop {
|
||||
// stops the runner if the notifier was closed.
|
||||
if notifier.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
// stops the runner if the value of notifier is `true`
|
||||
if *notifier.borrow() {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut interval = interval(debounce_duration);
|
||||
interval.tick().await;
|
||||
let _ = dispatcher.write().await.process_next_task().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TaskHandler: Send + Sync + 'static {
|
||||
fn handler_id(&self) -> &str;
|
||||
|
||||
fn handler_name(&self) -> &str {
|
||||
""
|
||||
}
|
||||
|
||||
fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error>;
|
||||
}
|
||||
|
||||
impl<T> TaskHandler for Box<T>
|
||||
where
|
||||
T: TaskHandler,
|
||||
{
|
||||
fn handler_id(&self) -> &str {
|
||||
(**self).handler_id()
|
||||
}
|
||||
|
||||
fn handler_name(&self) -> &str {
|
||||
(**self).handler_name()
|
||||
}
|
||||
|
||||
fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
|
||||
(**self).run(content)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TaskHandler for Arc<T>
|
||||
where
|
||||
T: TaskHandler,
|
||||
{
|
||||
fn handler_id(&self) -> &str {
|
||||
(**self).handler_id()
|
||||
}
|
||||
|
||||
fn handler_name(&self) -> &str {
|
||||
(**self).handler_name()
|
||||
}
|
||||
|
||||
fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
|
||||
(**self).run(content)
|
||||
}
|
||||
}
|
51
frontend/rust-lib/lib-infra/src/priority_task/store.rs
Normal file
51
frontend/rust-lib/lib-infra/src/priority_task/store.rs
Normal file
@ -0,0 +1,51 @@
|
||||
use crate::priority_task::{Task, TaskId, TaskState};
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::atomic::Ordering::SeqCst;
|
||||
|
||||
pub(crate) struct TaskStore {
|
||||
tasks: HashMap<TaskId, Task>,
|
||||
task_id_counter: AtomicU32,
|
||||
}
|
||||
|
||||
impl TaskStore {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
tasks: HashMap::new(),
|
||||
task_id_counter: AtomicU32::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert_task(&mut self, task: Task) {
|
||||
self.tasks.insert(task.id, task);
|
||||
}
|
||||
|
||||
pub(crate) fn remove_task(&mut self, task_id: &TaskId) -> Option<Task> {
|
||||
self.tasks.remove(task_id)
|
||||
}
|
||||
|
||||
pub(crate) fn mut_task(&mut self, task_id: &TaskId) -> Option<&mut Task> {
|
||||
self.tasks.get_mut(task_id)
|
||||
}
|
||||
|
||||
pub(crate) fn read_task(&self, task_id: &TaskId) -> Option<&Task> {
|
||||
self.tasks.get(task_id)
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self) {
|
||||
let tasks = mem::take(&mut self.tasks);
|
||||
tasks.into_values().for_each(|mut task| {
|
||||
if task.ret.is_some() {
|
||||
let ret = task.ret.take().unwrap();
|
||||
task.set_state(TaskState::Cancel);
|
||||
let _ = ret.send(task.into());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn next_task_id(&self) -> TaskId {
|
||||
let _ = self.task_id_counter.fetch_add(1, SeqCst);
|
||||
self.task_id_counter.load(SeqCst)
|
||||
}
|
||||
}
|
145
frontend/rust-lib/lib-infra/src/priority_task/task.rs
Normal file
145
frontend/rust-lib/lib-infra/src/priority_task/task.rs
Normal file
@ -0,0 +1,145 @@
|
||||
use crate::priority_task::TaskHandlerId;
|
||||
use std::cmp::Ordering;
|
||||
use tokio::sync::oneshot::{Receiver, Sender};
|
||||
|
||||
#[derive(Eq, Debug, Clone, Copy)]
|
||||
pub enum QualityOfService {
|
||||
Background,
|
||||
UserInteractive,
|
||||
}
|
||||
|
||||
impl PartialEq for QualityOfService {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
matches!(
|
||||
(self, other),
|
||||
(Self::Background, Self::Background) | (Self::UserInteractive, Self::UserInteractive)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub type TaskId = u32;
|
||||
|
||||
#[derive(Eq, Debug, Clone, Copy)]
|
||||
pub struct PendingTask {
|
||||
pub qos: QualityOfService,
|
||||
pub id: TaskId,
|
||||
}
|
||||
|
||||
impl PartialEq for PendingTask {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.id.eq(&other.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for PendingTask {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for PendingTask {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self.qos, other.qos) {
|
||||
// User interactive
|
||||
(QualityOfService::UserInteractive, QualityOfService::UserInteractive) => {
|
||||
self.id.cmp(&other.id)
|
||||
},
|
||||
(QualityOfService::UserInteractive, _) => Ordering::Greater,
|
||||
(_, QualityOfService::UserInteractive) => Ordering::Less,
|
||||
// background
|
||||
(QualityOfService::Background, QualityOfService::Background) => self.id.cmp(&other.id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TaskContent {
|
||||
Text(String),
|
||||
Blob(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum TaskState {
|
||||
Pending,
|
||||
Processing,
|
||||
Done,
|
||||
Failure,
|
||||
Cancel,
|
||||
Timeout,
|
||||
}
|
||||
|
||||
impl TaskState {
|
||||
pub fn is_pending(&self) -> bool {
|
||||
matches!(self, TaskState::Pending)
|
||||
}
|
||||
pub fn is_done(&self) -> bool {
|
||||
matches!(self, TaskState::Done)
|
||||
}
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(self, TaskState::Cancel)
|
||||
}
|
||||
|
||||
pub fn is_processing(&self) -> bool {
|
||||
matches!(self, TaskState::Processing)
|
||||
}
|
||||
|
||||
pub fn is_failed(&self) -> bool {
|
||||
matches!(self, TaskState::Failure)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Task {
|
||||
pub id: TaskId,
|
||||
pub handler_id: TaskHandlerId,
|
||||
pub content: Option<TaskContent>,
|
||||
pub qos: QualityOfService,
|
||||
state: TaskState,
|
||||
pub ret: Option<Sender<TaskResult>>,
|
||||
pub recv: Option<Receiver<TaskResult>>,
|
||||
}
|
||||
|
||||
impl Task {
|
||||
pub fn background(handler_id: &str, id: TaskId, content: TaskContent) -> Self {
|
||||
Self::new(handler_id, id, content, QualityOfService::Background)
|
||||
}
|
||||
|
||||
pub fn user_interactive(handler_id: &str, id: TaskId, content: TaskContent) -> Self {
|
||||
Self::new(handler_id, id, content, QualityOfService::UserInteractive)
|
||||
}
|
||||
|
||||
pub fn new(handler_id: &str, id: TaskId, content: TaskContent, qos: QualityOfService) -> Self {
|
||||
let handler_id = handler_id.to_owned();
|
||||
let (ret, recv) = tokio::sync::oneshot::channel();
|
||||
Self {
|
||||
handler_id,
|
||||
id,
|
||||
content: Some(content),
|
||||
qos,
|
||||
ret: Some(ret),
|
||||
recv: Some(recv),
|
||||
state: TaskState::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn state(&self) -> &TaskState {
|
||||
&self.state
|
||||
}
|
||||
|
||||
pub(crate) fn set_state(&mut self, status: TaskState) {
|
||||
self.state = status;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TaskResult {
|
||||
pub id: TaskId,
|
||||
pub state: TaskState,
|
||||
}
|
||||
|
||||
impl std::convert::From<Task> for TaskResult {
|
||||
fn from(task: Task) -> Self {
|
||||
TaskResult {
|
||||
id: task.id,
|
||||
state: task.state().clone(),
|
||||
}
|
||||
}
|
||||
}
|
1
frontend/rust-lib/lib-infra/tests/main.rs
Normal file
1
frontend/rust-lib/lib-infra/tests/main.rs
Normal file
@ -0,0 +1 @@
|
||||
mod task_test;
|
3
frontend/rust-lib/lib-infra/tests/task_test/mod.rs
Normal file
3
frontend/rust-lib/lib-infra/tests/task_test/mod.rs
Normal file
@ -0,0 +1,3 @@
|
||||
mod script;
|
||||
mod task_cancel_test;
|
||||
mod task_order_test;
|
216
frontend/rust-lib/lib-infra/tests/task_test/script.rs
Normal file
216
frontend/rust-lib/lib-infra/tests/task_test/script.rs
Normal file
@ -0,0 +1,216 @@
|
||||
use anyhow::Error;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use lib_infra::async_trait::async_trait;
|
||||
use lib_infra::future::BoxResultFuture;
|
||||
use lib_infra::priority_task::{
|
||||
Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState,
|
||||
};
|
||||
use lib_infra::ref_map::RefCountValue;
|
||||
use rand::Rng;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot::Receiver;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub enum SearchScript {
|
||||
AddTask {
|
||||
task: Task,
|
||||
},
|
||||
AddTasks {
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
#[allow(dead_code)]
|
||||
Wait {
|
||||
millisecond: u64,
|
||||
},
|
||||
CancelTask {
|
||||
task_id: TaskId,
|
||||
},
|
||||
UnregisterHandler {
|
||||
handler_id: String,
|
||||
},
|
||||
AssertTaskStatus {
|
||||
task_id: TaskId,
|
||||
expected_status: TaskState,
|
||||
},
|
||||
AssertExecuteOrder {
|
||||
execute_order: Vec<u32>,
|
||||
rets: Vec<Receiver<TaskResult>>,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct SearchTest {
|
||||
scheduler: Arc<RwLock<TaskDispatcher>>,
|
||||
}
|
||||
|
||||
impl SearchTest {
|
||||
pub async fn new() -> Self {
|
||||
let duration = Duration::from_millis(1000);
|
||||
let mut scheduler = TaskDispatcher::new(duration);
|
||||
scheduler.register_handler(Arc::new(MockTextTaskHandler()));
|
||||
scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
|
||||
scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
|
||||
|
||||
let scheduler = Arc::new(RwLock::new(scheduler));
|
||||
tokio::spawn(TaskRunner::run(scheduler.clone()));
|
||||
|
||||
Self { scheduler }
|
||||
}
|
||||
|
||||
pub async fn next_task_id(&self) -> TaskId {
|
||||
self.scheduler.read().await.next_task_id()
|
||||
}
|
||||
|
||||
pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
|
||||
for script in scripts {
|
||||
self.run_script(script).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_script(&self, script: SearchScript) {
|
||||
match script {
|
||||
SearchScript::AddTask { task } => {
|
||||
self.scheduler.write().await.add_task(task);
|
||||
},
|
||||
SearchScript::CancelTask { task_id } => {
|
||||
self.scheduler.write().await.cancel_task(task_id);
|
||||
},
|
||||
SearchScript::AddTasks { tasks } => {
|
||||
let mut scheduler = self.scheduler.write().await;
|
||||
for task in tasks {
|
||||
scheduler.add_task(task);
|
||||
}
|
||||
},
|
||||
SearchScript::Wait { millisecond } => {
|
||||
tokio::time::sleep(Duration::from_millis(millisecond)).await;
|
||||
},
|
||||
SearchScript::UnregisterHandler { handler_id } => {
|
||||
self
|
||||
.scheduler
|
||||
.write()
|
||||
.await
|
||||
.unregister_handler(handler_id)
|
||||
.await;
|
||||
},
|
||||
SearchScript::AssertTaskStatus {
|
||||
task_id,
|
||||
expected_status,
|
||||
} => {
|
||||
let status = self
|
||||
.scheduler
|
||||
.read()
|
||||
.await
|
||||
.read_task(&task_id)
|
||||
.unwrap()
|
||||
.state()
|
||||
.clone();
|
||||
assert_eq!(status, expected_status);
|
||||
},
|
||||
SearchScript::AssertExecuteOrder {
|
||||
execute_order,
|
||||
rets,
|
||||
} => {
|
||||
let mut futures = FuturesUnordered::new();
|
||||
for ret in rets {
|
||||
futures.push(ret);
|
||||
}
|
||||
let mut orders = vec![];
|
||||
while let Some(Ok(result)) = futures.next().await {
|
||||
orders.push(result.id);
|
||||
assert!(result.state.is_done());
|
||||
}
|
||||
assert_eq!(execute_order, orders);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockTextTaskHandler();
|
||||
#[async_trait]
|
||||
impl RefCountValue for MockTextTaskHandler {
|
||||
async fn did_remove(&self) {}
|
||||
}
|
||||
|
||||
impl TaskHandler for MockTextTaskHandler {
|
||||
fn handler_id(&self) -> &str {
|
||||
"1"
|
||||
}
|
||||
|
||||
fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
|
||||
let mut rng = rand::thread_rng();
|
||||
let millisecond = rng.gen_range(1..50);
|
||||
Box::pin(async move {
|
||||
match content {
|
||||
TaskContent::Text(_s) => {
|
||||
tokio::time::sleep(Duration::from_millis(millisecond)).await;
|
||||
},
|
||||
TaskContent::Blob(_) => panic!("Only support text"),
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
|
||||
let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
|
||||
let recv = task.recv.take().unwrap();
|
||||
(task, recv)
|
||||
}
|
||||
|
||||
pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
|
||||
let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
|
||||
let recv = task.recv.take().unwrap();
|
||||
(task, recv)
|
||||
}
|
||||
|
||||
pub struct MockBlobTaskHandler();
|
||||
#[async_trait]
|
||||
impl RefCountValue for MockBlobTaskHandler {
|
||||
async fn did_remove(&self) {}
|
||||
}
|
||||
|
||||
impl TaskHandler for MockBlobTaskHandler {
|
||||
fn handler_id(&self) -> &str {
|
||||
"2"
|
||||
}
|
||||
|
||||
fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
|
||||
Box::pin(async move {
|
||||
match content {
|
||||
TaskContent::Text(_) => panic!("Only support blob"),
|
||||
TaskContent::Blob(bytes) => {
|
||||
let _msg = String::from_utf8(bytes).unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockTimeoutTaskHandler();
|
||||
|
||||
impl TaskHandler for MockTimeoutTaskHandler {
|
||||
fn handler_id(&self) -> &str {
|
||||
"3"
|
||||
}
|
||||
|
||||
fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
|
||||
Box::pin(async move {
|
||||
match content {
|
||||
TaskContent::Text(_) => panic!("Only support blob"),
|
||||
TaskContent::Blob(_bytes) => {
|
||||
tokio::time::sleep(Duration::from_millis(2000)).await;
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
|
||||
let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
|
||||
let recv = task.recv.take().unwrap();
|
||||
(task, recv)
|
||||
}
|
@ -0,0 +1,91 @@
|
||||
use crate::task_test::script::SearchScript::*;
|
||||
use crate::task_test::script::{make_text_background_task, make_timeout_task, SearchTest};
|
||||
use lib_infra::priority_task::{QualityOfService, Task, TaskContent, TaskState};
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_cancel_background_task_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task_1, ret_1) = make_text_background_task(test.next_task_id().await, "Hello world");
|
||||
let (task_2, ret_2) = make_text_background_task(test.next_task_id().await, "");
|
||||
test
|
||||
.run_scripts(vec![
|
||||
AddTask { task: task_1 },
|
||||
AddTask { task: task_2 },
|
||||
AssertTaskStatus {
|
||||
task_id: 1,
|
||||
expected_status: TaskState::Pending,
|
||||
},
|
||||
AssertTaskStatus {
|
||||
task_id: 2,
|
||||
expected_status: TaskState::Pending,
|
||||
},
|
||||
CancelTask { task_id: 2 },
|
||||
AssertTaskStatus {
|
||||
task_id: 2,
|
||||
expected_status: TaskState::Cancel,
|
||||
},
|
||||
])
|
||||
.await;
|
||||
|
||||
let result = ret_1.await.unwrap();
|
||||
assert_eq!(result.state, TaskState::Done);
|
||||
|
||||
let result = ret_2.await.unwrap();
|
||||
assert_eq!(result.state, TaskState::Cancel);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_with_empty_handler_id_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let mut task = Task::new(
|
||||
"",
|
||||
test.next_task_id().await,
|
||||
TaskContent::Text("".to_owned()),
|
||||
QualityOfService::Background,
|
||||
);
|
||||
let ret = task.recv.take().unwrap();
|
||||
test.run_scripts(vec![AddTask { task }]).await;
|
||||
|
||||
let result = ret.await.unwrap();
|
||||
assert_eq!(result.state, TaskState::Cancel);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_can_not_find_handler_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task, ret) = make_text_background_task(test.next_task_id().await, "Hello world");
|
||||
let handler_id = task.handler_id.clone();
|
||||
test
|
||||
.run_scripts(vec![UnregisterHandler { handler_id }, AddTask { task }])
|
||||
.await;
|
||||
|
||||
let result = ret.await.unwrap();
|
||||
assert_eq!(result.state, TaskState::Cancel);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_can_not_find_handler_test2() {
|
||||
let test = SearchTest::new().await;
|
||||
let mut tasks = vec![];
|
||||
let mut rets = vec![];
|
||||
let handler_id = "1".to_owned();
|
||||
for _i in 1..10000 {
|
||||
let (task, ret) = make_text_background_task(test.next_task_id().await, "");
|
||||
tasks.push(task);
|
||||
rets.push(ret);
|
||||
}
|
||||
|
||||
test
|
||||
.run_scripts(vec![UnregisterHandler { handler_id }, AddTasks { tasks }])
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_run_timeout_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task, ret) = make_timeout_task(test.next_task_id().await);
|
||||
test.run_scripts(vec![AddTask { task }]).await;
|
||||
|
||||
let result = ret.await.unwrap();
|
||||
assert_eq!(result.state, TaskState::Timeout);
|
||||
}
|
115
frontend/rust-lib/lib-infra/tests/task_test/task_order_test.rs
Normal file
115
frontend/rust-lib/lib-infra/tests/task_test/task_order_test.rs
Normal file
@ -0,0 +1,115 @@
|
||||
use crate::task_test::script::{
|
||||
make_text_background_task, make_text_user_interactive_task, SearchScript::*, SearchTest,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_add_single_background_task_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task, ret) = make_text_background_task(test.next_task_id().await, "");
|
||||
test.run_scripts(vec![AddTask { task }]).await;
|
||||
|
||||
let result = ret.await.unwrap();
|
||||
assert!(result.state.is_done())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_add_multiple_background_tasks_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task_1, ret_1) = make_text_background_task(test.next_task_id().await, "");
|
||||
let (task_2, ret_2) = make_text_background_task(test.next_task_id().await, "");
|
||||
let (task_3, ret_3) = make_text_background_task(test.next_task_id().await, "");
|
||||
test
|
||||
.run_scripts(vec![
|
||||
AddTask { task: task_1 },
|
||||
AddTask { task: task_2 },
|
||||
AddTask { task: task_3 },
|
||||
AssertExecuteOrder {
|
||||
execute_order: vec![3, 2, 1],
|
||||
rets: vec![ret_1, ret_2, ret_3],
|
||||
},
|
||||
])
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_add_multiple_user_interactive_tasks_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task_1, ret_1) = make_text_user_interactive_task(test.next_task_id().await, "");
|
||||
let (task_2, ret_2) = make_text_user_interactive_task(test.next_task_id().await, "");
|
||||
let (task_3, ret_3) = make_text_user_interactive_task(test.next_task_id().await, "");
|
||||
test
|
||||
.run_scripts(vec![
|
||||
AddTask { task: task_1 },
|
||||
AddTask { task: task_2 },
|
||||
AddTask { task: task_3 },
|
||||
AssertExecuteOrder {
|
||||
execute_order: vec![3, 2, 1],
|
||||
rets: vec![ret_1, ret_2, ret_3],
|
||||
},
|
||||
])
|
||||
.await;
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn task_add_multiple_different_kind_tasks_test() {
|
||||
let test = SearchTest::new().await;
|
||||
let (task_1, ret_1) = make_text_background_task(test.next_task_id().await, "");
|
||||
let (task_2, ret_2) = make_text_user_interactive_task(test.next_task_id().await, "");
|
||||
let (task_3, ret_3) = make_text_background_task(test.next_task_id().await, "");
|
||||
test
|
||||
.run_scripts(vec![
|
||||
AddTask { task: task_1 },
|
||||
AddTask { task: task_2 },
|
||||
AddTask { task: task_3 },
|
||||
AssertExecuteOrder {
|
||||
execute_order: vec![2, 3, 1],
|
||||
rets: vec![ret_1, ret_2, ret_3],
|
||||
},
|
||||
])
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_add_multiple_different_kind_tasks_test2() {
|
||||
let test = SearchTest::new().await;
|
||||
let mut tasks = vec![];
|
||||
let mut rets = vec![];
|
||||
|
||||
for i in 0..10 {
|
||||
let (task, ret) = if i % 2 == 0 {
|
||||
make_text_background_task(test.next_task_id().await, "")
|
||||
} else {
|
||||
make_text_user_interactive_task(test.next_task_id().await, "")
|
||||
};
|
||||
tasks.push(task);
|
||||
rets.push(ret);
|
||||
}
|
||||
|
||||
test
|
||||
.run_scripts(vec![
|
||||
AddTasks { tasks },
|
||||
AssertExecuteOrder {
|
||||
execute_order: vec![10, 8, 6, 4, 2, 9, 7, 5, 3, 1],
|
||||
rets,
|
||||
},
|
||||
])
|
||||
.await;
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn task_add_1000_tasks_test() {
|
||||
// let test = SearchTest::new().await;
|
||||
// let mut tasks = vec![];
|
||||
// let mut execute_order = vec![];
|
||||
// let mut rets = vec![];
|
||||
//
|
||||
// for i in 1..1000 {
|
||||
// let (task, ret) = make_text_background_task(test.next_task_id().await, "");
|
||||
// execute_order.push(i);
|
||||
// tasks.push(task);
|
||||
// rets.push(ret);
|
||||
// }
|
||||
// execute_order.reverse();
|
||||
//
|
||||
// test.run_scripts(vec![AddTasks { tasks }, AssertExecuteOrder { execute_order, rets }])
|
||||
// .await;
|
||||
// }
|
Reference in New Issue
Block a user