InvokeAI/invokeai/app/api/routers/session_queue.py
2023-10-13 21:49:45 +11:00

248 lines
8.4 KiB
Python

from typing import Optional
from fastapi import Body, Path, Query
from fastapi.routing import APIRouter
from pydantic import BaseModel
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
from invokeai.app.services.session_queue.session_queue_common import (
QUEUE_ITEM_STATUS,
Batch,
BatchStatus,
CancelByBatchIDsResult,
ClearResult,
EnqueueBatchResult,
EnqueueGraphResult,
PruneResult,
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueStatus,
)
from invokeai.app.services.shared.models import CursorPaginatedResults
from ...services.graph import Graph
from ..dependencies import ApiDependencies
session_queue_router = APIRouter(prefix="/v1/queue", tags=["queue"])
class SessionQueueAndProcessorStatus(BaseModel):
"""The overall status of session queue and processor"""
queue: SessionQueueStatus
processor: SessionProcessorStatus
@session_queue_router.post(
"/{queue_id}/enqueue_graph",
operation_id="enqueue_graph",
responses={
201: {"model": EnqueueGraphResult},
},
)
async def enqueue_graph(
queue_id: str = Path(description="The queue id to perform this operation on"),
graph: Graph = Body(description="The graph to enqueue"),
prepend: bool = Body(default=False, description="Whether or not to prepend this batch in the queue"),
) -> EnqueueGraphResult:
"""Enqueues a graph for single execution."""
return ApiDependencies.invoker.services.session_queue.enqueue_graph(queue_id=queue_id, graph=graph, prepend=prepend)
@session_queue_router.post(
"/{queue_id}/enqueue_batch",
operation_id="enqueue_batch",
responses={
201: {"model": EnqueueBatchResult},
},
)
async def enqueue_batch(
queue_id: str = Path(description="The queue id to perform this operation on"),
batch: Batch = Body(description="Batch to process"),
prepend: bool = Body(default=False, description="Whether or not to prepend this batch in the queue"),
) -> EnqueueBatchResult:
"""Processes a batch and enqueues the output graphs for execution."""
return ApiDependencies.invoker.services.session_queue.enqueue_batch(queue_id=queue_id, batch=batch, prepend=prepend)
@session_queue_router.get(
"/{queue_id}/list",
operation_id="list_queue_items",
responses={
200: {"model": CursorPaginatedResults[SessionQueueItemDTO]},
},
)
async def list_queue_items(
queue_id: str = Path(description="The queue id to perform this operation on"),
limit: int = Query(default=50, description="The number of items to fetch"),
status: Optional[QUEUE_ITEM_STATUS] = Query(default=None, description="The status of items to fetch"),
cursor: Optional[int] = Query(default=None, description="The pagination cursor"),
priority: int = Query(default=0, description="The pagination cursor priority"),
) -> CursorPaginatedResults[SessionQueueItemDTO]:
"""Gets all queue items (without graphs)"""
return ApiDependencies.invoker.services.session_queue.list_queue_items(
queue_id=queue_id, limit=limit, status=status, cursor=cursor, priority=priority
)
@session_queue_router.put(
"/{queue_id}/processor/resume",
operation_id="resume",
responses={200: {"model": SessionProcessorStatus}},
)
async def resume(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionProcessorStatus:
"""Resumes session processor"""
return ApiDependencies.invoker.services.session_processor.resume()
@session_queue_router.put(
"/{queue_id}/processor/pause",
operation_id="pause",
responses={200: {"model": SessionProcessorStatus}},
)
async def Pause(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionProcessorStatus:
"""Pauses session processor"""
return ApiDependencies.invoker.services.session_processor.pause()
@session_queue_router.put(
"/{queue_id}/cancel_by_batch_ids",
operation_id="cancel_by_batch_ids",
responses={200: {"model": CancelByBatchIDsResult}},
)
async def cancel_by_batch_ids(
queue_id: str = Path(description="The queue id to perform this operation on"),
batch_ids: list[str] = Body(description="The list of batch_ids to cancel all queue items for", embed=True),
) -> CancelByBatchIDsResult:
"""Immediately cancels all queue items from the given batch ids"""
return ApiDependencies.invoker.services.session_queue.cancel_by_batch_ids(queue_id=queue_id, batch_ids=batch_ids)
@session_queue_router.put(
"/{queue_id}/clear",
operation_id="clear",
responses={
200: {"model": ClearResult},
},
)
async def clear(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> ClearResult:
"""Clears the queue entirely, immediately canceling the currently-executing session"""
queue_item = ApiDependencies.invoker.services.session_queue.get_current(queue_id)
if queue_item is not None:
ApiDependencies.invoker.services.session_queue.cancel_queue_item(queue_item.item_id)
clear_result = ApiDependencies.invoker.services.session_queue.clear(queue_id)
return clear_result
@session_queue_router.put(
"/{queue_id}/prune",
operation_id="prune",
responses={
200: {"model": PruneResult},
},
)
async def prune(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> PruneResult:
"""Prunes all completed or errored queue items"""
return ApiDependencies.invoker.services.session_queue.prune(queue_id)
@session_queue_router.get(
"/{queue_id}/current",
operation_id="get_current_queue_item",
responses={
200: {"model": Optional[SessionQueueItem]},
},
)
async def get_current_queue_item(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> Optional[SessionQueueItem]:
"""Gets the currently execution queue item"""
return ApiDependencies.invoker.services.session_queue.get_current(queue_id)
@session_queue_router.get(
"/{queue_id}/next",
operation_id="get_next_queue_item",
responses={
200: {"model": Optional[SessionQueueItem]},
},
)
async def get_next_queue_item(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> Optional[SessionQueueItem]:
"""Gets the next queue item, without executing it"""
return ApiDependencies.invoker.services.session_queue.get_next(queue_id)
@session_queue_router.get(
"/{queue_id}/status",
operation_id="get_queue_status",
responses={
200: {"model": SessionQueueAndProcessorStatus},
},
)
async def get_queue_status(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionQueueAndProcessorStatus:
"""Gets the status of the session queue"""
queue = ApiDependencies.invoker.services.session_queue.get_queue_status(queue_id)
processor = ApiDependencies.invoker.services.session_processor.get_status()
return SessionQueueAndProcessorStatus(queue=queue, processor=processor)
@session_queue_router.get(
"/{queue_id}/b/{batch_id}/status",
operation_id="get_batch_status",
responses={
200: {"model": BatchStatus},
},
)
async def get_batch_status(
queue_id: str = Path(description="The queue id to perform this operation on"),
batch_id: str = Path(description="The batch to get the status of"),
) -> BatchStatus:
"""Gets the status of the session queue"""
return ApiDependencies.invoker.services.session_queue.get_batch_status(queue_id=queue_id, batch_id=batch_id)
@session_queue_router.get(
"/{queue_id}/i/{item_id}",
operation_id="get_queue_item",
responses={
200: {"model": SessionQueueItem},
},
)
async def get_queue_item(
queue_id: str = Path(description="The queue id to perform this operation on"),
item_id: int = Path(description="The queue item to get"),
) -> SessionQueueItem:
"""Gets a queue item"""
return ApiDependencies.invoker.services.session_queue.get_queue_item(item_id)
@session_queue_router.put(
"/{queue_id}/i/{item_id}/cancel",
operation_id="cancel_queue_item",
responses={
200: {"model": SessionQueueItem},
},
)
async def cancel_queue_item(
queue_id: str = Path(description="The queue id to perform this operation on"),
item_id: int = Path(description="The queue item to cancel"),
) -> SessionQueueItem:
"""Deletes a queue item"""
return ApiDependencies.invoker.services.session_queue.cancel_queue_item(item_id)