InvokeAI/invokeai/app/services/invoker.py
psychedelicious 402cf9b0ee feat: refactor services folder/module structure
Refactor services folder/module structure.

**Motivation**

While working on our services I've repeatedly encountered circular imports and a general lack of clarity regarding where to put things. The structure introduced goes a long way towards resolving those issues, setting us up for a clean structure going forward.

**Services**

Services are now in their own folder with a few files:

- `services/{service_name}/__init__.py`: init as needed, mostly empty now
- `services/{service_name}/{service_name}_base.py`: the base class for the service
- `services/{service_name}/{service_name}_{impl_type}.py`: the default concrete implementation of the service - typically one of `sqlite`, `default`, or `memory`
- `services/{service_name}/{service_name}_common.py`: any common items - models, exceptions, utilities, etc

Though it's a bit verbose to have the service name both as the folder name and the prefix for files, I found it is _extremely_ confusing to have all of the base classes just be named `base.py`. So, at the cost of some verbosity when importing things, I've included the service name in the filename.

There are some minor logic changes. For example, in `InvocationProcessor`, instead of assigning the model manager service to a variable to be used later in the file, the service is used directly via the `Invoker`.

**Shared**

Things that are used across disparate services are in `services/shared/`:

- `default_graphs.py`: previously in `services/`
- `graphs.py`: previously in `services/`
- `paginatation`: generic pagination models used in a few services
- `sqlite`: the `SqliteDatabase` class, other sqlite-specific things
2023-10-12 12:15:06 -04:00

86 lines
3.0 KiB
Python

# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Optional
from .invocation_queue.invocation_queue_common import InvocationQueueItem
from .invocation_services import InvocationServices
from .shared.graph import Graph, GraphExecutionState
class Invoker:
"""The invoker, used to execute invocations"""
services: InvocationServices
def __init__(self, services: InvocationServices):
self.services = services
self._start()
def invoke(
self,
session_queue_id: str,
session_queue_item_id: int,
session_queue_batch_id: str,
graph_execution_state: GraphExecutionState,
invoke_all: bool = False,
) -> Optional[str]:
"""Determines the next node to invoke and enqueues it, preparing if needed.
Returns the id of the queued node, or `None` if there are no nodes left to enqueue."""
# Get the next invocation
invocation = graph_execution_state.next()
if not invocation:
return None
# Save the execution state
self.services.graph_execution_manager.set(graph_execution_state)
# Queue the invocation
self.services.queue.put(
InvocationQueueItem(
session_queue_id=session_queue_id,
session_queue_item_id=session_queue_item_id,
session_queue_batch_id=session_queue_batch_id,
graph_execution_state_id=graph_execution_state.id,
invocation_id=invocation.id,
invoke_all=invoke_all,
)
)
return invocation.id
def create_execution_state(self, graph: Optional[Graph] = None) -> GraphExecutionState:
"""Creates a new execution state for the given graph"""
new_state = GraphExecutionState(graph=Graph() if graph is None else graph)
self.services.graph_execution_manager.set(new_state)
return new_state
def cancel(self, graph_execution_state_id: str) -> None:
"""Cancels the given execution state"""
self.services.queue.cancel(graph_execution_state_id)
def __start_service(self, service) -> None:
# Call start() method on any services that have it
start_op = getattr(service, "start", None)
if callable(start_op):
start_op(self)
def __stop_service(self, service) -> None:
# Call stop() method on any services that have it
stop_op = getattr(service, "stop", None)
if callable(stop_op):
stop_op(self)
def _start(self) -> None:
"""Starts the invoker. This is called automatically when the invoker is created."""
for service in vars(self.services):
self.__start_service(getattr(self.services, service))
def stop(self) -> None:
"""Stops the invoker. A new invoker will have to be created to execute further."""
# First stop all services
for service in vars(self.services):
self.__stop_service(getattr(self.services, service))
self.services.queue.put(None)