feat: queued generation (#4502)

* fix(config): fix typing issues in `config/`

`config/invokeai_config.py`:
- use `Optional` for things that are optional
- fix typing of `ram_cache_size()` and `vram_cache_size()`
- remove unused and incorrectly typed method `autoconvert_path`
- fix types and logic for `parse_args()`, in which `InvokeAIAppConfig.initconf` *must* be a `DictConfig`, but function would allow it to be set as a `ListConfig`, which presumably would cause issues elsewhere

`config/base.py`:
- use `cls` for first arg of class methods
- use `Optional` for things that are optional
- fix minor type issue related to setting of `env_prefix`
- remove unused `add_subparser()` method, which calls `add_parser()` on an `ArgumentParser` (method only available on the `_SubParsersAction` object, which is returned from ArgumentParser.add_subparsers()`)

* feat: queued generation and batches

Due to a very messy branch with broad addition of `isort` on `main` alongside it, some git surgery was needed to get an agreeable git history. This commit represents all of the work on queued generation. See PR for notes.

* chore: flake8, isort, black

* fix(nodes): fix incorrect service stop() method

* fix(nodes): improve names of a few variables

* fix(tests): fix up tests after changes to batches/queue

* feat(tests): add unit tests for session queue helper functions

* feat(ui): dynamic prompts is always enabled

* feat(queue): add queue_status_changed event

* feat(ui): wip queue graphs

* feat(nodes): move cleanup til after invoker startup

* feat(nodes): add cancel_by_batch_ids

* feat(ui): wip batch graphs & UI

* fix(nodes): remove `Batch.batch_id` from required

* fix(ui): cleanup and use fixedCacheKey for all mutations

* fix(ui): remove orphaned nodes from canvas graphs

* fix(nodes): fix cancel_by_batch_ids result count

* fix(ui): only show cancel batch tooltip when batches were canceled

* chore: isort

* fix(api): return `[""]` when dynamic prompts generates no prompts

Just a simple fallback so we always have a prompt.

* feat(ui): dynamicPrompts.combinatorial is always on

There seems to be little purpose in using the combinatorial generation for dynamic prompts. I've disabled it by hiding it from the UI and defaulting combinatorial to true. If we want to enable it again in the future it's straightforward to do so.

* feat: add queue_id & support logic

* feat(ui): fix upscale button

It prepends the upscale operation to queue

* feat(nodes): return queue item when enqueuing a single graph

This facilitates one-off graph async workflows in the client.

* feat(ui): move controlnet autoprocess to queue

* fix(ui): fix non-serializable DOMRect in redux state

* feat(ui): QueueTable performance tweaks

* feat(ui): update queue list

Queue items expand to show the full queue item. Just as JSON for now.

* wip threaded session_processor

* feat(nodes,ui): fully migrate queue to session_processor

* feat(nodes,ui): add processor events

* feat(ui): ui tweaks

* feat(nodes,ui): consolidate events, reduce network requests

* feat(ui): cleanup & abstract queue hooks

* feat(nodes): optimize batch permutation

Use a generator to do only as much work as is needed.

Previously, though we only ended up creating exactly as many queue items as was needed, there was still some intermediary work that calculated *all* permutations. When that number was very high, the system had a very hard time and used a lot of memory.

The logic has been refactored to use a generator. Additionally, the batch validators are optimized to return early and use less memory.

* feat(ui): add seed behaviour parameter

This dynamic prompts parameter allows the seed to be randomized per prompt or per iteration:
- Per iteration: Use the same seed for all prompts in a single dynamic prompt expansion
- Per prompt: Use a different seed for every single prompt

"Per iteration" is appropriate for exploring a the latents space with a stable starting noise, while "Per prompt" provides more variation.

* fix(ui): remove extraneous random seed nodes from linear graphs

* fix(ui): fix controlnet autoprocess not working when queue is running

* feat(queue): add timestamps to queue status updates

Also show execution time in queue list

* feat(queue): change all execution-related events to use the `queue_id` as the room, also include `queue_item_id` in InvocationQueueItem

This allows for much simpler handling of queue items.

* feat(api): deprecate sessions router

* chore(backend): tidy logging in `dependencies.py`

* fix(backend): respect `use_memory_db`

* feat(backend): add `config.log_sql` (enables sql trace logging)

* feat: add invocation cache

Supersedes #4574

The invocation cache provides simple node memoization functionality. Nodes that use the cache are memoized and not re-executed if their inputs haven't changed. Instead, the stored output is returned.

## Results

This feature provides anywhere some significant to massive performance improvement.

The improvement is most marked on large batches of generations where you only change a couple things (e.g. different seed or prompt for each iteration) and low-VRAM systems, where skipping an extraneous model load is a big deal.

## Overview

A new `invocation_cache` service is added to handle the caching. There's not much to it.

All nodes now inherit a boolean `use_cache` field from `BaseInvocation`. This is a node field and not a class attribute, because specific instances of nodes may want to opt in or out of caching.

The recently-added `invoke_internal()` method on `BaseInvocation` is used as an entrypoint for the cache logic.

To create a cache key, the invocation is first serialized using pydantic's provided `json()` method, skipping the unique `id` field. Then python's very fast builtin `hash()` is used to create an integer key. All implementations of `InvocationCacheBase` must provide a class method `create_key()` which accepts an invocation and outputs a string or integer key.

## In-Memory Implementation

An in-memory implementation is provided. In this implementation, the node outputs are stored in memory as python classes. The in-memory cache does not persist application restarts.

Max node cache size is added as `node_cache_size` under the `Generation` config category.

It defaults to 512 - this number is up for discussion, but given that these are relatively lightweight pydantic models, I think it's safe to up this even higher.

Note that the cache isn't storing the big stuff - tensors and images are store on disk, and outputs include only references to them.

## Node Definition

The default for all nodes is to use the cache. The `@invocation` decorator now accepts an optional `use_cache: bool` argument to override the default of `True`.

Non-deterministic nodes, however, should set this to `False`. Currently, all random-stuff nodes, including `dynamic_prompt`, are set to `False`.

The field name `use_cache` is now effectively a reserved field name and possibly a breaking change if any community nodes use this as a field name. In hindsight, all our reserved field names should have been prefixed with underscores or something.

## One Gotcha

Leaf nodes probably want to opt out of the cache, because if they are not cached, their outputs are not saved again.

If you run the same graph multiple times, you only end up with a single image output, because the image storage side-effects are in the `invoke()` method, which is bypassed if we have a cache hit.

## Linear UI

The linear graphs _almost_ just work, but due to the gotcha, we need to be careful about the final image-outputting node. To resolve this, a `SaveImageInvocation` node is added and used in the linear graphs.

This node is similar to `ImagePrimitive`, except it saves a copy of its input image, and has `use_cache` set to `False` by default.

This is now the leaf node in all linear graphs, and is the only node in those graphs with `use_cache == False` _and_ the only node with `is_intermedate == False`.

## Workflow Editor

All nodes now have a footer with a new `Use Cache [ ]` checkbox. It defaults to the value set by the invocation in its python definition, but can be changed by the user.

The workflow/node validation logic has been updated to migrate old workflows to use the new default values for `use_cache`. Users may still want to review the settings that have been chosen. In the event of catastrophic failure when running this migration, the default value of `True` is applied, as this is correct for most nodes.

Users should consider saving their workflows after loading them in and having them updated.

## Future Enhancements - Callback

A future enhancement would be to provide a callback to the `use_cache` flag that would be run as the node is executed to determine, based on its own internal state, if the cache should be used or not.

This would be useful for `DynamicPromptInvocation`, where the deterministic behaviour is determined by the `combinatorial: bool` field.

## Future Enhancements - Persisted Cache

Similar to how the latents storage is backed by disk, the invocation cache could be persisted to the database or disk. We'd need to be very careful about deserializing outputs, but it's perhaps worth exploring in the future.

* fix(ui): fix queue list item width

* feat(nodes): do not send the whole node on every generator progress

* feat(ui): strip out old logic related to sessions

Things like `isProcessing` are no longer relevant with queue. Removed them all & updated everything be appropriate for queue. May be a few little quirks I've missed...

* feat(ui): fix up param collapse labels

* feat(ui): click queue count to go to queue tab

* tidy(queue): update comment, query format

* feat(ui): fix progress bar when canceling

* fix(ui): fix circular dependency

* feat(nodes): bail on node caching logic if `node_cache_size == 0`

* feat(nodes): handle KeyError on node cache pop

* feat(nodes): bypass cache codepath if caches is disabled

more better no do thing

* fix(ui): reset api cache on connect/disconnect

* feat(ui): prevent enqueue when no prompts generated

* feat(ui): add queue controls to workflow editor

* feat(ui): update floating buttons & other incidental UI tweaks

* fix(ui): fix missing/incorrect translation keys

* fix(tests): add config service to mock invocation services

invoking needs access to `node_cache_size` to occur

* optionally remove pause/resume buttons from queue UI

* option to disable prepending

* chore(ui): remove unused file

* feat(queue): remove `order_id` entirely, `item_id` is now an autoinc pk

---------

Co-authored-by: Mary Hipp <maryhipp@Marys-MacBook-Air.local>
This commit is contained in:
psychedelicious
2023-09-20 15:09:24 +10:00
committed by GitHub
parent 977e348a35
commit b7938d9ca9
271 changed files with 9294 additions and 3050 deletions

View File

@ -0,0 +1,112 @@
from abc import ABC, abstractmethod
from typing import Optional
from invokeai.app.services.graph import Graph
from invokeai.app.services.session_queue.session_queue_common import (
QUEUE_ITEM_STATUS,
Batch,
BatchStatus,
CancelByBatchIDsResult,
CancelByQueueIDResult,
ClearResult,
EnqueueBatchResult,
EnqueueGraphResult,
IsEmptyResult,
IsFullResult,
PruneResult,
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueStatus,
)
from invokeai.app.services.shared.models import CursorPaginatedResults
class SessionQueueBase(ABC):
"""Base class for session queue"""
@abstractmethod
def dequeue(self) -> Optional[SessionQueueItem]:
"""Dequeues the next session queue item."""
pass
@abstractmethod
def enqueue_graph(self, queue_id: str, graph: Graph, prepend: bool) -> EnqueueGraphResult:
"""Enqueues a single graph for execution."""
pass
@abstractmethod
def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
"""Enqueues all permutations of a batch for execution."""
pass
@abstractmethod
def get_current(self, queue_id: str) -> Optional[SessionQueueItem]:
"""Gets the currently-executing session queue item"""
pass
@abstractmethod
def get_next(self, queue_id: str) -> Optional[SessionQueueItem]:
"""Gets the next session queue item (does not dequeue it)"""
pass
@abstractmethod
def clear(self, queue_id: str) -> ClearResult:
"""Deletes all session queue items"""
pass
@abstractmethod
def prune(self, queue_id: str) -> PruneResult:
"""Deletes all completed and errored session queue items"""
pass
@abstractmethod
def is_empty(self, queue_id: str) -> IsEmptyResult:
"""Checks if the queue is empty"""
pass
@abstractmethod
def is_full(self, queue_id: str) -> IsFullResult:
"""Checks if the queue is empty"""
pass
@abstractmethod
def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
"""Gets the status of the queue"""
pass
@abstractmethod
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
"""Gets the status of a batch"""
pass
@abstractmethod
def cancel_queue_item(self, item_id: int) -> SessionQueueItem:
"""Cancels a session queue item"""
pass
@abstractmethod
def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult:
"""Cancels all queue items with matching batch IDs"""
pass
@abstractmethod
def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
"""Cancels all queue items with matching queue ID"""
pass
@abstractmethod
def list_queue_items(
self,
queue_id: str,
limit: int,
priority: int,
cursor: Optional[int] = None,
status: Optional[QUEUE_ITEM_STATUS] = None,
) -> CursorPaginatedResults[SessionQueueItemDTO]:
"""Gets a page of session queue items"""
pass
@abstractmethod
def get_queue_item(self, item_id: int) -> SessionQueueItem:
"""Gets a session queue item by ID"""
pass

View File

@ -0,0 +1,418 @@
import datetime
import json
from itertools import chain, product
from typing import Generator, Iterable, Literal, NamedTuple, Optional, TypeAlias, Union, cast
from pydantic import BaseModel, Field, StrictStr, parse_raw_as, root_validator, validator
from pydantic.json import pydantic_encoder
from invokeai.app.invocations.baseinvocation import BaseInvocation
from invokeai.app.services.graph import Graph, GraphExecutionState, NodeNotFoundError
from invokeai.app.util.misc import uuid_string
# region Errors
class BatchZippedLengthError(ValueError):
"""Raise when a batch has items of different lengths."""
class BatchItemsTypeError(TypeError):
"""Raise when a batch has items of different types."""
class BatchDuplicateNodeFieldError(ValueError):
"""Raise when a batch has duplicate node_path and field_name."""
class TooManySessionsError(ValueError):
"""Raise when too many sessions are requested."""
class SessionQueueItemNotFoundError(ValueError):
"""Raise when a queue item is not found."""
# endregion
# region Batch
BatchDataType = Union[
StrictStr,
float,
int,
]
class NodeFieldValue(BaseModel):
node_path: str = Field(description="The node into which this batch data item will be substituted.")
field_name: str = Field(description="The field into which this batch data item will be substituted.")
value: BatchDataType = Field(description="The value to substitute into the node/field.")
class BatchDatum(BaseModel):
node_path: str = Field(description="The node into which this batch data collection will be substituted.")
field_name: str = Field(description="The field into which this batch data collection will be substituted.")
items: list[BatchDataType] = Field(
default_factory=list, description="The list of items to substitute into the node/field."
)
BatchDataCollection: TypeAlias = list[list[BatchDatum]]
class Batch(BaseModel):
batch_id: str = Field(default_factory=uuid_string, description="The ID of the batch")
data: Optional[BatchDataCollection] = Field(default=None, description="The batch data collection.")
graph: Graph = Field(description="The graph to initialize the session with")
runs: int = Field(
default=1, ge=1, description="Int stating how many times to iterate through all possible batch indices"
)
@validator("data")
def validate_lengths(cls, v: Optional[BatchDataCollection]):
if v is None:
return v
for batch_data_list in v:
first_item_length = len(batch_data_list[0].items) if batch_data_list and batch_data_list[0].items else 0
for i in batch_data_list:
if len(i.items) != first_item_length:
raise BatchZippedLengthError("Zipped batch items must all have the same length")
return v
@validator("data")
def validate_types(cls, v: Optional[BatchDataCollection]):
if v is None:
return v
for batch_data_list in v:
for datum in batch_data_list:
# Get the type of the first item in the list
first_item_type = type(datum.items[0]) if datum.items else None
for item in datum.items:
if type(item) is not first_item_type:
raise BatchItemsTypeError("All items in a batch must have the same type")
return v
@validator("data")
def validate_unique_field_mappings(cls, v: Optional[BatchDataCollection]):
if v is None:
return v
paths: set[tuple[str, str]] = set()
for batch_data_list in v:
for datum in batch_data_list:
pair = (datum.node_path, datum.field_name)
if pair in paths:
raise BatchDuplicateNodeFieldError("Each batch data must have unique node_id and field_name")
paths.add(pair)
return v
@root_validator(skip_on_failure=True)
def validate_batch_nodes_and_edges(cls, values):
batch_data_collection = cast(Optional[BatchDataCollection], values["data"])
if batch_data_collection is None:
return values
graph = cast(Graph, values["graph"])
for batch_data_list in batch_data_collection:
for batch_data in batch_data_list:
try:
node = cast(BaseInvocation, graph.get_node(batch_data.node_path))
except NodeNotFoundError:
raise NodeNotFoundError(f"Node {batch_data.node_path} not found in graph")
if batch_data.field_name not in node.__fields__:
raise NodeNotFoundError(f"Field {batch_data.field_name} not found in node {batch_data.node_path}")
return values
class Config:
schema_extra = {
"required": [
"graph",
"runs",
]
}
# endregion Batch
# region Queue Items
DEFAULT_QUEUE_ID = "default"
QUEUE_ITEM_STATUS = Literal["pending", "in_progress", "completed", "failed", "canceled"]
def get_field_values(queue_item_dict: dict) -> Optional[list[NodeFieldValue]]:
field_values_raw = queue_item_dict.get("field_values", None)
return parse_raw_as(list[NodeFieldValue], field_values_raw) if field_values_raw is not None else None
def get_session(queue_item_dict: dict) -> GraphExecutionState:
session_raw = queue_item_dict.get("session", "{}")
return parse_raw_as(GraphExecutionState, session_raw)
class SessionQueueItemWithoutGraph(BaseModel):
"""Session queue item without the full graph. Used for serialization."""
item_id: int = Field(description="The identifier of the session queue item")
status: QUEUE_ITEM_STATUS = Field(default="pending", description="The status of this queue item")
priority: int = Field(default=0, description="The priority of this queue item")
batch_id: str = Field(description="The ID of the batch associated with this queue item")
session_id: str = Field(
description="The ID of the session associated with this queue item. The session doesn't exist in graph_executions until the queue item is executed."
)
field_values: Optional[list[NodeFieldValue]] = Field(
default=None, description="The field values that were used for this queue item"
)
queue_id: str = Field(description="The id of the queue with which this item is associated")
error: Optional[str] = Field(default=None, description="The error message if this queue item errored")
created_at: Union[datetime.datetime, str] = Field(description="When this queue item was created")
updated_at: Union[datetime.datetime, str] = Field(description="When this queue item was updated")
started_at: Optional[Union[datetime.datetime, str]] = Field(description="When this queue item was started")
completed_at: Optional[Union[datetime.datetime, str]] = Field(description="When this queue item was completed")
@classmethod
def from_dict(cls, queue_item_dict: dict) -> "SessionQueueItemDTO":
# must parse these manually
queue_item_dict["field_values"] = get_field_values(queue_item_dict)
return SessionQueueItemDTO(**queue_item_dict)
class Config:
schema_extra = {
"required": [
"item_id",
"status",
"batch_id",
"queue_id",
"session_id",
"priority",
"session_id",
"created_at",
"updated_at",
]
}
class SessionQueueItemDTO(SessionQueueItemWithoutGraph):
pass
class SessionQueueItem(SessionQueueItemWithoutGraph):
session: GraphExecutionState = Field(description="The fully-populated session to be executed")
@classmethod
def from_dict(cls, queue_item_dict: dict) -> "SessionQueueItem":
# must parse these manually
queue_item_dict["field_values"] = get_field_values(queue_item_dict)
queue_item_dict["session"] = get_session(queue_item_dict)
return SessionQueueItem(**queue_item_dict)
class Config:
schema_extra = {
"required": [
"item_id",
"status",
"batch_id",
"queue_id",
"session_id",
"session",
"priority",
"session_id",
"created_at",
"updated_at",
]
}
# endregion Queue Items
# region Query Results
class SessionQueueStatus(BaseModel):
queue_id: str = Field(..., description="The ID of the queue")
item_id: Optional[int] = Field(description="The current queue item id")
batch_id: Optional[str] = Field(description="The current queue item's batch id")
session_id: Optional[str] = Field(description="The current queue item's session id")
pending: int = Field(..., description="Number of queue items with status 'pending'")
in_progress: int = Field(..., description="Number of queue items with status 'in_progress'")
completed: int = Field(..., description="Number of queue items with status 'complete'")
failed: int = Field(..., description="Number of queue items with status 'error'")
canceled: int = Field(..., description="Number of queue items with status 'canceled'")
total: int = Field(..., description="Total number of queue items")
class BatchStatus(BaseModel):
queue_id: str = Field(..., description="The ID of the queue")
batch_id: str = Field(..., description="The ID of the batch")
pending: int = Field(..., description="Number of queue items with status 'pending'")
in_progress: int = Field(..., description="Number of queue items with status 'in_progress'")
completed: int = Field(..., description="Number of queue items with status 'complete'")
failed: int = Field(..., description="Number of queue items with status 'error'")
canceled: int = Field(..., description="Number of queue items with status 'canceled'")
total: int = Field(..., description="Total number of queue items")
class EnqueueBatchResult(BaseModel):
queue_id: str = Field(description="The ID of the queue")
enqueued: int = Field(description="The total number of queue items enqueued")
requested: int = Field(description="The total number of queue items requested to be enqueued")
batch: Batch = Field(description="The batch that was enqueued")
priority: int = Field(description="The priority of the enqueued batch")
class EnqueueGraphResult(BaseModel):
enqueued: int = Field(description="The total number of queue items enqueued")
requested: int = Field(description="The total number of queue items requested to be enqueued")
batch: Batch = Field(description="The batch that was enqueued")
priority: int = Field(description="The priority of the enqueued batch")
queue_item: SessionQueueItemDTO = Field(description="The queue item that was enqueued")
class ClearResult(BaseModel):
"""Result of clearing the session queue"""
deleted: int = Field(..., description="Number of queue items deleted")
class PruneResult(ClearResult):
"""Result of pruning the session queue"""
pass
class CancelByBatchIDsResult(BaseModel):
"""Result of canceling by list of batch ids"""
canceled: int = Field(..., description="Number of queue items canceled")
class CancelByQueueIDResult(CancelByBatchIDsResult):
"""Result of canceling by queue id"""
pass
class IsEmptyResult(BaseModel):
"""Result of checking if the session queue is empty"""
is_empty: bool = Field(..., description="Whether the session queue is empty")
class IsFullResult(BaseModel):
"""Result of checking if the session queue is full"""
is_full: bool = Field(..., description="Whether the session queue is full")
# endregion Query Results
# region Util
def populate_graph(graph: Graph, node_field_values: Iterable[NodeFieldValue]) -> Graph:
"""
Populates the given graph with the given batch data items.
"""
graph_clone = graph.copy(deep=True)
for item in node_field_values:
node = graph_clone.get_node(item.node_path)
if node is None:
continue
setattr(node, item.field_name, item.value)
graph_clone.update_node(item.node_path, node)
return graph_clone
def create_session_nfv_tuples(
batch: Batch, maximum: int
) -> Generator[tuple[GraphExecutionState, list[NodeFieldValue]], None, None]:
"""
Create all graph permutations from the given batch data and graph. Yields tuples
of the form (graph, batch_data_items) where batch_data_items is the list of BatchDataItems
that was applied to the graph.
"""
# TODO: Should this be a class method on Batch?
data: list[list[tuple[NodeFieldValue]]] = []
batch_data_collection = batch.data if batch.data is not None else []
for batch_datum_list in batch_data_collection:
# each batch_datum_list needs to be convered to NodeFieldValues and then zipped
node_field_values_to_zip: list[list[NodeFieldValue]] = []
for batch_datum in batch_datum_list:
node_field_values = [
NodeFieldValue(node_path=batch_datum.node_path, field_name=batch_datum.field_name, value=item)
for item in batch_datum.items
]
node_field_values_to_zip.append(node_field_values)
data.append(list(zip(*node_field_values_to_zip)))
# create generator to yield session,nfv tuples
count = 0
for _ in range(batch.runs):
for d in product(*data):
if count >= maximum:
return
flat_node_field_values = list(chain.from_iterable(d))
graph = populate_graph(batch.graph, flat_node_field_values)
yield (GraphExecutionState(graph=graph), flat_node_field_values)
count += 1
def calc_session_count(batch: Batch) -> int:
"""
Calculates the number of sessions that would be created by the batch, without incurring
the overhead of actually generating them. Adapted from `create_sessions().
"""
# TODO: Should this be a class method on Batch?
if not batch.data:
return batch.runs
data = []
for batch_datum_list in batch.data:
to_zip = []
for batch_datum in batch_datum_list:
batch_data_items = range(len(batch_datum.items))
to_zip.append(batch_data_items)
data.append(list(zip(*to_zip)))
data_product = list(product(*data))
return len(data_product) * batch.runs
class SessionQueueValueToInsert(NamedTuple):
"""A tuple of values to insert into the session_queue table"""
queue_id: str # queue_id
session: str # session json
session_id: str # session_id
batch_id: str # batch_id
field_values: Optional[str] # field_values json
priority: int # priority
ValuesToInsert: TypeAlias = list[SessionQueueValueToInsert]
def prepare_values_to_insert(queue_id: str, batch: Batch, priority: int, max_new_queue_items: int) -> ValuesToInsert:
values_to_insert: ValuesToInsert = []
for session, field_values in create_session_nfv_tuples(batch, max_new_queue_items):
# sessions must have unique id
session.id = uuid_string()
values_to_insert.append(
SessionQueueValueToInsert(
queue_id, # queue_id
session.json(), # session (json)
session.id, # session_id
batch.batch_id, # batch_id
# must use pydantic_encoder bc field_values is a list of models
json.dumps(field_values, default=pydantic_encoder) if field_values else None, # field_values (json)
priority, # priority
)
)
return values_to_insert
# endregion Util

View File

@ -0,0 +1,813 @@
import sqlite3
import threading
from typing import Optional, Union, cast
from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event as FastAPIEvent
from invokeai.app.services.events import EventServiceBase
from invokeai.app.services.graph import Graph
from invokeai.app.services.invoker import Invoker
from invokeai.app.services.session_queue.session_queue_base import SessionQueueBase
from invokeai.app.services.session_queue.session_queue_common import (
DEFAULT_QUEUE_ID,
QUEUE_ITEM_STATUS,
Batch,
BatchStatus,
CancelByBatchIDsResult,
CancelByQueueIDResult,
ClearResult,
EnqueueBatchResult,
EnqueueGraphResult,
IsEmptyResult,
IsFullResult,
PruneResult,
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueItemNotFoundError,
SessionQueueStatus,
calc_session_count,
prepare_values_to_insert,
)
from invokeai.app.services.shared.models import CursorPaginatedResults
class SqliteSessionQueue(SessionQueueBase):
__invoker: Invoker
__conn: sqlite3.Connection
__cursor: sqlite3.Cursor
__lock: threading.Lock
def start(self, invoker: Invoker) -> None:
self.__invoker = invoker
self._set_in_progress_to_canceled()
prune_result = self.prune(DEFAULT_QUEUE_ID)
local_handler.register(event_name=EventServiceBase.queue_event, _func=self._on_session_event)
self.__invoker.services.logger.info(f"Pruned {prune_result.deleted} finished queue items")
def __init__(self, conn: sqlite3.Connection, lock: threading.Lock) -> None:
super().__init__()
self.__conn = conn
# Enable row factory to get rows as dictionaries (must be done before making the cursor!)
self.__conn.row_factory = sqlite3.Row
self.__cursor = self.__conn.cursor()
self.__lock = lock
self._create_tables()
def _match_event_name(self, event: FastAPIEvent, match_in: list[str]) -> bool:
return event[1]["event"] in match_in
async def _on_session_event(self, event: FastAPIEvent) -> FastAPIEvent:
event_name = event[1]["event"]
match event_name:
case "graph_execution_state_complete":
await self._handle_complete_event(event)
case "invocation_error" | "session_retrieval_error" | "invocation_retrieval_error":
await self._handle_error_event(event)
case "session_canceled":
await self._handle_cancel_event(event)
return event
async def _handle_complete_event(self, event: FastAPIEvent) -> None:
try:
item_id = event[1]["data"]["queue_item_id"]
# When a queue item has an error, we get an error event, then a completed event.
# Mark the queue item completed only if it isn't already marked completed, e.g.
# by a previously-handled error event.
queue_item = self.get_queue_item(item_id)
if queue_item.status not in ["completed", "failed", "canceled"]:
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="completed")
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
except SessionQueueItemNotFoundError:
return
async def _handle_error_event(self, event: FastAPIEvent) -> None:
try:
item_id = event[1]["data"]["queue_item_id"]
error = event[1]["data"]["error"]
queue_item = self.get_queue_item(item_id)
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="failed", error=error)
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
except SessionQueueItemNotFoundError:
return
async def _handle_cancel_event(self, event: FastAPIEvent) -> None:
try:
item_id = event[1]["data"]["queue_item_id"]
queue_item = self.get_queue_item(item_id)
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="canceled")
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
except SessionQueueItemNotFoundError:
return
def _create_tables(self) -> None:
"""Creates the session queue tables, indicies, and triggers"""
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS session_queue (
item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- used for ordering, cursor pagination
batch_id TEXT NOT NULL, -- identifier of the batch this queue item belongs to
queue_id TEXT NOT NULL, -- identifier of the queue this queue item belongs to
session_id TEXT NOT NULL UNIQUE, -- duplicated data from the session column, for ease of access
field_values TEXT, -- NULL if no values are associated with this queue item
session TEXT NOT NULL, -- the session to be executed
status TEXT NOT NULL DEFAULT 'pending', -- the status of the queue item, one of 'pending', 'in_progress', 'completed', 'failed', 'canceled'
priority INTEGER NOT NULL DEFAULT 0, -- the priority, higher is more important
error TEXT, -- any errors associated with this queue item
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), -- updated via trigger
started_at DATETIME, -- updated via trigger
completed_at DATETIME -- updated via trigger, completed items are cleaned up on application startup
-- Ideally this is a FK, but graph_executions uses INSERT OR REPLACE, and REPLACE triggers the ON DELETE CASCADE...
-- FOREIGN KEY (session_id) REFERENCES graph_executions (id) ON DELETE CASCADE
);
"""
)
self.__cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_item_id ON session_queue(item_id);
"""
)
self.__cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_session_id ON session_queue(session_id);
"""
)
self.__cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_batch_id ON session_queue(batch_id);
"""
)
self.__cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_created_priority ON session_queue(priority);
"""
)
self.__cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_created_status ON session_queue(status);
"""
)
self.__cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_completed_at
AFTER UPDATE OF status ON session_queue
FOR EACH ROW
WHEN
NEW.status = 'completed'
OR NEW.status = 'failed'
OR NEW.status = 'canceled'
BEGIN
UPDATE session_queue
SET completed_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = NEW.item_id;
END;
"""
)
self.__cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_started_at
AFTER UPDATE OF status ON session_queue
FOR EACH ROW
WHEN
NEW.status = 'in_progress'
BEGIN
UPDATE session_queue
SET started_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = NEW.item_id;
END;
"""
)
self.__cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_updated_at
AFTER UPDATE
ON session_queue FOR EACH ROW
BEGIN
UPDATE session_queue
SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = old.item_id;
END;
"""
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
def _set_in_progress_to_canceled(self) -> None:
"""
Sets all in_progress queue items to canceled. Run on app startup, not associated with any queue.
This is necessary because the invoker may have been killed while processing a queue item.
"""
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
UPDATE session_queue
SET status = 'canceled'
WHERE status = 'in_progress';
"""
)
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
def _get_current_queue_size(self, queue_id: str) -> int:
"""Gets the current number of pending queue items"""
self.__cursor.execute(
"""--sql
SELECT count(*)
FROM session_queue
WHERE
queue_id = ?
AND status = 'pending'
""",
(queue_id,),
)
return cast(int, self.__cursor.fetchone()[0])
def _get_highest_priority(self, queue_id: str) -> int:
"""Gets the highest priority value in the queue"""
self.__cursor.execute(
"""--sql
SELECT MAX(priority)
FROM session_queue
WHERE
queue_id = ?
AND status = 'pending'
""",
(queue_id,),
)
return cast(Union[int, None], self.__cursor.fetchone()[0]) or 0
def enqueue_graph(self, queue_id: str, graph: Graph, prepend: bool) -> EnqueueGraphResult:
enqueue_result = self.enqueue_batch(queue_id=queue_id, batch=Batch(graph=graph), prepend=prepend)
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT *
FROM session_queue
WHERE queue_id = ?
AND batch_id = ?
""",
(queue_id, enqueue_result.batch.batch_id),
)
result = cast(Union[sqlite3.Row, None], self.__cursor.fetchone())
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
if result is None:
raise SessionQueueItemNotFoundError(f"No queue item with batch id {enqueue_result.batch.batch_id}")
return EnqueueGraphResult(
**enqueue_result.dict(),
queue_item=SessionQueueItemDTO.from_dict(dict(result)),
)
def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
try:
self.__lock.acquire()
# TODO: how does this work in a multi-user scenario?
current_queue_size = self._get_current_queue_size(queue_id)
max_queue_size = self.__invoker.services.configuration.get_config().max_queue_size
max_new_queue_items = max_queue_size - current_queue_size
priority = 0
if prepend:
priority = self._get_highest_priority(queue_id) + 1
requested_count = calc_session_count(batch)
values_to_insert = prepare_values_to_insert(
queue_id=queue_id,
batch=batch,
priority=priority,
max_new_queue_items=max_new_queue_items,
)
enqueued_count = len(values_to_insert)
if requested_count > enqueued_count:
values_to_insert = values_to_insert[:max_new_queue_items]
self.__cursor.executemany(
"""--sql
INSERT INTO session_queue (queue_id, session, session_id, batch_id, field_values, priority)
VALUES (?, ?, ?, ?, ?, ?)
""",
values_to_insert,
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
enqueue_result = EnqueueBatchResult(
queue_id=queue_id,
requested=requested_count,
enqueued=enqueued_count,
batch=batch,
priority=priority,
)
self.__invoker.services.events.emit_batch_enqueued(enqueue_result)
return enqueue_result
def dequeue(self) -> Optional[SessionQueueItem]:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT *
FROM session_queue
WHERE status = 'pending'
ORDER BY
priority DESC,
item_id ASC
LIMIT 1
"""
)
result = cast(Union[sqlite3.Row, None], self.__cursor.fetchone())
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
if result is None:
return None
queue_item = SessionQueueItem.from_dict(dict(result))
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="in_progress")
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
return queue_item
def get_next(self, queue_id: str) -> Optional[SessionQueueItem]:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT *
FROM session_queue
WHERE
queue_id = ?
AND status = 'pending'
ORDER BY
priority DESC,
created_at ASC
LIMIT 1
""",
(queue_id,),
)
result = cast(Union[sqlite3.Row, None], self.__cursor.fetchone())
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
if result is None:
return None
return SessionQueueItem.from_dict(dict(result))
def get_current(self, queue_id: str) -> Optional[SessionQueueItem]:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT *
FROM session_queue
WHERE
queue_id = ?
AND status = 'in_progress'
LIMIT 1
""",
(queue_id,),
)
result = cast(Union[sqlite3.Row, None], self.__cursor.fetchone())
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
if result is None:
return None
return SessionQueueItem.from_dict(dict(result))
def _set_queue_item_status(
self, item_id: int, status: QUEUE_ITEM_STATUS, error: Optional[str] = None
) -> SessionQueueItem:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
UPDATE session_queue
SET status = ?, error = ?
WHERE item_id = ?
""",
(status, error, item_id),
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return self.get_queue_item(item_id)
def is_empty(self, queue_id: str) -> IsEmptyResult:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT count(*)
FROM session_queue
WHERE queue_id = ?
""",
(queue_id,),
)
is_empty = cast(int, self.__cursor.fetchone()[0]) == 0
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return IsEmptyResult(is_empty=is_empty)
def is_full(self, queue_id: str) -> IsFullResult:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT count(*)
FROM session_queue
WHERE queue_id = ?
""",
(queue_id,),
)
max_queue_size = self.__invoker.services.configuration.max_queue_size
is_full = cast(int, self.__cursor.fetchone()[0]) >= max_queue_size
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return IsFullResult(is_full=is_full)
def delete_queue_item(self, item_id: int) -> SessionQueueItem:
queue_item = self.get_queue_item(item_id=item_id)
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
DELETE FROM session_queue
WHERE
item_id = ?
""",
(item_id,),
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return queue_item
def clear(self, queue_id: str) -> ClearResult:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT COUNT(*)
FROM session_queue
WHERE queue_id = ?
""",
(queue_id,),
)
count = self.__cursor.fetchone()[0]
self.__cursor.execute(
"""--sql
DELETE
FROM session_queue
WHERE queue_id = ?
""",
(queue_id,),
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
self.__invoker.services.events.emit_queue_cleared(queue_id)
return ClearResult(deleted=count)
def prune(self, queue_id: str) -> PruneResult:
try:
where = """--sql
WHERE
queue_id = ?
AND (
status = 'completed'
OR status = 'failed'
OR status = 'canceled'
)
"""
self.__lock.acquire()
self.__cursor.execute(
f"""--sql
SELECT COUNT(*)
FROM session_queue
{where};
""",
(queue_id,),
)
count = self.__cursor.fetchone()[0]
self.__cursor.execute(
f"""--sql
DELETE
FROM session_queue
{where};
""",
(queue_id,),
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return PruneResult(deleted=count)
def cancel_queue_item(self, item_id: int) -> SessionQueueItem:
queue_item = self.get_queue_item(item_id)
if queue_item.status not in ["canceled", "failed", "completed"]:
queue_item = self._set_queue_item_status(item_id=item_id, status="canceled")
self.__invoker.services.queue.cancel(queue_item.session_id)
self.__invoker.services.events.emit_session_canceled(
queue_item_id=queue_item.item_id,
queue_id=queue_item.queue_id,
graph_execution_state_id=queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
return queue_item
def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult:
try:
current_queue_item = self.get_current(queue_id)
self.__lock.acquire()
placeholders = ", ".join(["?" for _ in batch_ids])
where = f"""--sql
WHERE
queue_id == ?
AND batch_id IN ({placeholders})
AND status != 'canceled'
AND status != 'completed'
AND status != 'failed'
"""
params = [queue_id] + batch_ids
self.__cursor.execute(
f"""--sql
SELECT COUNT(*)
FROM session_queue
{where};
""",
tuple(params),
)
count = self.__cursor.fetchone()[0]
self.__cursor.execute(
f"""--sql
UPDATE session_queue
SET status = 'canceled'
{where};
""",
tuple(params),
)
self.__conn.commit()
if current_queue_item is not None and current_queue_item.batch_id in batch_ids:
self.__invoker.services.queue.cancel(current_queue_item.session_id)
self.__invoker.services.events.emit_session_canceled(
queue_item_id=current_queue_item.item_id,
queue_id=current_queue_item.queue_id,
graph_execution_state_id=current_queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(current_queue_item)
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return CancelByBatchIDsResult(canceled=count)
def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
try:
current_queue_item = self.get_current(queue_id)
self.__lock.acquire()
where = """--sql
WHERE
queue_id is ?
AND status != 'canceled'
AND status != 'completed'
AND status != 'failed'
"""
params = [queue_id]
self.__cursor.execute(
f"""--sql
SELECT COUNT(*)
FROM session_queue
{where};
""",
tuple(params),
)
count = self.__cursor.fetchone()[0]
self.__cursor.execute(
f"""--sql
UPDATE session_queue
SET status = 'canceled'
{where};
""",
tuple(params),
)
self.__conn.commit()
if current_queue_item is not None and current_queue_item.queue_id == queue_id:
self.__invoker.services.queue.cancel(current_queue_item.session_id)
self.__invoker.services.events.emit_session_canceled(
queue_item_id=current_queue_item.item_id,
queue_id=current_queue_item.queue_id,
graph_execution_state_id=current_queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(current_queue_item)
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return CancelByQueueIDResult(canceled=count)
def get_queue_item(self, item_id: int) -> SessionQueueItem:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT * FROM session_queue
WHERE
item_id = ?
""",
(item_id,),
)
result = cast(Union[sqlite3.Row, None], self.__cursor.fetchone())
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
if result is None:
raise SessionQueueItemNotFoundError(f"No queue item with id {item_id}")
return SessionQueueItem.from_dict(dict(result))
def list_queue_items(
self,
queue_id: str,
limit: int,
priority: int,
cursor: Optional[int] = None,
status: Optional[QUEUE_ITEM_STATUS] = None,
) -> CursorPaginatedResults[SessionQueueItemDTO]:
try:
item_id = cursor
self.__lock.acquire()
query = """--sql
SELECT item_id,
status,
priority,
field_values,
error,
created_at,
updated_at,
completed_at,
started_at,
session_id,
batch_id,
queue_id
FROM session_queue
WHERE queue_id = ?
"""
params: list[Union[str, int]] = [queue_id]
if status is not None:
query += """--sql
AND status = ?
"""
params.append(status)
if item_id is not None:
query += """--sql
AND (priority < ?) OR (priority = ? AND item_id > ?)
"""
params.extend([priority, priority, item_id])
query += """--sql
ORDER BY
priority DESC,
item_id ASC
LIMIT ?
"""
params.append(limit + 1)
self.__cursor.execute(query, params)
results = cast(list[sqlite3.Row], self.__cursor.fetchall())
items = [SessionQueueItemDTO.from_dict(dict(result)) for result in results]
has_more = False
if len(items) > limit:
# remove the extra item
items.pop()
has_more = True
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return CursorPaginatedResults(items=items, limit=limit, has_more=has_more)
def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT status, count(*)
FROM session_queue
WHERE queue_id = ?
GROUP BY status
""",
(queue_id,),
)
counts_result = cast(list[sqlite3.Row], self.__cursor.fetchall())
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
current_item = self.get_current(queue_id=queue_id)
total = sum(row[1] for row in counts_result)
counts: dict[str, int] = {row[0]: row[1] for row in counts_result}
return SessionQueueStatus(
queue_id=queue_id,
item_id=current_item.item_id if current_item else None,
session_id=current_item.session_id if current_item else None,
batch_id=current_item.batch_id if current_item else None,
pending=counts.get("pending", 0),
in_progress=counts.get("in_progress", 0),
completed=counts.get("completed", 0),
failed=counts.get("failed", 0),
canceled=counts.get("canceled", 0),
total=total,
)
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
SELECT status, count(*)
FROM session_queue
WHERE
queue_id = ?
AND batch_id = ?
GROUP BY status
""",
(queue_id, batch_id),
)
result = cast(list[sqlite3.Row], self.__cursor.fetchall())
total = sum(row[1] for row in result)
counts: dict[str, int] = {row[0]: row[1] for row in result}
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
return BatchStatus(
batch_id=batch_id,
queue_id=queue_id,
pending=counts.get("pending", 0),
in_progress=counts.get("in_progress", 0),
completed=counts.get("completed", 0),
failed=counts.get("failed", 0),
canceled=counts.get("canceled", 0),
total=total,
)