InvokeAI/ldm/invoke/app/services/invoker.py
Kyle Schouviller 357601e2d6
parent 9eed1919c2
author Kyle Schouviller <kyle0654@hotmail.com> 1669872800 -0800
committer Kyle Schouviller <kyle0654@hotmail.com> 1676240900 -0800

Adding base node architecture

Fix type annotation errors

Runs and generates, but breaks in saving session

Fix default model value setting. Fix deprecation warning.

Fixed node api

Adding markdown docs

Simplifying Generate construction in apps

[nodes] A few minor changes (#2510)

* Pin api-related requirements

* Remove confusing extra CORS origins list

* Adds response models for HTTP 200

[nodes] Adding graph_execution_state to soon replace session. Adding tests with pytest.

Minor typing fixes

[nodes] Fix some small output query hookups

[node] Fixing some additional typing issues

[nodes] Move and expand graph code. Add base item storage and sqlite implementation.

Update startup to match new code

[nodes] Add callbacks to item storage

[nodes] Adding an InvocationContext object to use for invocations to provide easier extensibility

[nodes] New execution model that handles iteration

[nodes] Fixing the CLI

[nodes] Adding a note to the CLI

[nodes] Split processing thread into separate service

[node] Add error message on node processing failure

Removing old files and duplicated packages

Adding python-multipart
2023-02-26 21:28:00 +01:00

109 lines
3.8 KiB
Python

# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from abc import ABC
from threading import Event, Thread
from .graph import Graph, GraphExecutionState
from .item_storage import ItemStorageABC
from ..invocations.baseinvocation import InvocationContext
from .invocation_services import InvocationServices
from .invocation_queue import InvocationQueueABC, InvocationQueueItem
class InvokerServices:
"""Services used by the Invoker for execution"""
queue: InvocationQueueABC
graph_execution_manager: ItemStorageABC[GraphExecutionState]
processor: 'InvocationProcessorABC'
def __init__(self,
queue: InvocationQueueABC,
graph_execution_manager: ItemStorageABC[GraphExecutionState],
processor: 'InvocationProcessorABC'):
self.queue = queue
self.graph_execution_manager = graph_execution_manager
self.processor = processor
class Invoker:
"""The invoker, used to execute invocations"""
services: InvocationServices
invoker_services: InvokerServices
def __init__(self,
services: InvocationServices, # Services used by nodes to perform invocations
invoker_services: InvokerServices # Services used by the invoker for orchestration
):
self.services = services
self.invoker_services = invoker_services
self._start()
def invoke(self, graph_execution_state: GraphExecutionState, invoke_all: bool = False) -> str|None:
"""Determines the next node to invoke and returns the id of the invoked node, or None if there are no nodes to execute"""
# Get the next invocation
invocation = graph_execution_state.next()
if not invocation:
return None
# Save the execution state
self.invoker_services.graph_execution_manager.set(graph_execution_state)
# Queue the invocation
print(f'queueing item {invocation.id}')
self.invoker_services.queue.put(InvocationQueueItem(
#session_id = session.id,
graph_execution_state_id = graph_execution_state.id,
invocation_id = invocation.id,
invoke_all = invoke_all
))
return invocation.id
def create_execution_state(self, graph: Graph|None = None) -> GraphExecutionState:
"""Creates a new execution state for the given graph"""
new_state = GraphExecutionState(graph = Graph() if graph is None else graph)
self.invoker_services.graph_execution_manager.set(new_state)
return new_state
def __start_service(self, service) -> None:
# Call start() method on any services that have it
start_op = getattr(service, 'start', None)
if callable(start_op):
start_op(self)
def __stop_service(self, service) -> None:
# Call stop() method on any services that have it
stop_op = getattr(service, 'stop', None)
if callable(stop_op):
stop_op(self)
def _start(self) -> None:
"""Starts the invoker. This is called automatically when the invoker is created."""
for service in vars(self.invoker_services):
self.__start_service(getattr(self.invoker_services, service))
for service in vars(self.services):
self.__start_service(getattr(self.services, service))
def stop(self) -> None:
"""Stops the invoker. A new invoker will have to be created to execute further."""
# First stop all services
for service in vars(self.services):
self.__stop_service(getattr(self.services, service))
for service in vars(self.invoker_services):
self.__stop_service(getattr(self.invoker_services, service))
self.invoker_services.queue.put(None)
class InvocationProcessorABC(ABC):
pass