refactor(nodes): merge processors

Consolidate graph processing logic into session processor.

With graphs as the unit of work, and the session queue distributing graphs, we no longer need the invocation queue or processor.

Instead, the session processor dequeues the next session and processes it in a simple loop, greatly simplifying the app.

- Remove `graph_execution_manager` service.
- Remove `queue` (invocation queue) service.
- Remove `processor` (invocation processor) service.
- Remove queue-related logic from `Invoker`. It now only starts and stops the services, providing them with access to other services.
- Remove unused `invocation_retrieval_error` and `session_retrieval_error` events, these are no longer needed.
- Clean up stats service now that it is less coupled to the rest of the app.
- Refactor cancellation logic - cancellations now originate from session queue (i.e. HTTP cancel endpoint) and are emitted as events. Processor gets the events and sets the canceled event. Access to this event is provided to the invocation context for e.g. the step callback.
- Remove `sessions` router; it provided access to `graph_executions` but that no longer exists.
This commit is contained in:
psychedelicious
2024-02-18 01:41:04 +11:00
parent da9991e361
commit 725c03cf87
22 changed files with 227 additions and 879 deletions

View File

@ -1,6 +1,7 @@
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Callable, Optional
from PIL.Image import Image
from torch import Tensor
@ -370,6 +371,12 @@ class ConfigInterface(InvocationContextInterface):
class UtilInterface(InvocationContextInterface):
def __init__(
self, services: InvocationServices, context_data: InvocationContextData, is_canceled: Callable[[], bool]
) -> None:
super().__init__(services, context_data)
self._is_canceled = is_canceled
def sd_step_callback(self, intermediate_state: PipelineIntermediateState, base_model: BaseModelType) -> None:
"""
The step callback emits a progress event with the current step, the total number of
@ -390,8 +397,8 @@ class UtilInterface(InvocationContextInterface):
context_data=self._context_data,
intermediate_state=intermediate_state,
base_model=base_model,
invocation_queue=self._services.queue,
events=self._services.events,
is_canceled=self._is_canceled,
)
@ -412,6 +419,7 @@ class InvocationContext:
boards: BoardsInterface,
context_data: InvocationContextData,
services: InvocationServices,
is_canceled: Callable[[], bool],
) -> None:
self.images = images
"""Provides methods to save, get and update images and their metadata."""
@ -433,11 +441,13 @@ class InvocationContext:
"""Provides data about the current queue item and invocation. This is an internal API and may change without warning."""
self._services = services
"""Provides access to the full application services. This is an internal API and may change without warning."""
self._is_canceled = is_canceled
def build_invocation_context(
services: InvocationServices,
context_data: InvocationContextData,
cancel_event: threading.Event,
) -> InvocationContext:
"""
Builds the invocation context for a specific invocation execution.
@ -446,12 +456,15 @@ def build_invocation_context(
:param invocation_context_data: The invocation context data.
"""
def is_canceled() -> bool:
return cancel_event.is_set()
logger = LoggerInterface(services=services, context_data=context_data)
images = ImagesInterface(services=services, context_data=context_data)
tensors = TensorsInterface(services=services, context_data=context_data)
models = ModelsInterface(services=services, context_data=context_data)
config = ConfigInterface(services=services, context_data=context_data)
util = UtilInterface(services=services, context_data=context_data)
util = UtilInterface(services=services, context_data=context_data, is_canceled=is_canceled)
conditioning = ConditioningInterface(services=services, context_data=context_data)
boards = BoardsInterface(services=services, context_data=context_data)
@ -466,6 +479,7 @@ def build_invocation_context(
conditioning=conditioning,
services=services,
boards=boards,
is_canceled=is_canceled,
)
return ctx