InvokeAI/invokeai/app/api/dependencies.py
psychedelicious 2cb51bff11 refactor(nodes): merge processors
Consolidate graph processing logic into session processor.

With graphs as the unit of work, and the session queue distributing graphs, we no longer need the invocation queue or processor.

Instead, the session processor dequeues the next session and processes it in a simple loop, greatly simplifying the app.

- Remove `graph_execution_manager` service.
- Remove `queue` (invocation queue) service.
- Remove `processor` (invocation processor) service.
- Remove queue-related logic from `Invoker`. It now only starts and stops the services, providing them with access to other services.
- Remove unused `invocation_retrieval_error` and `session_retrieval_error` events, these are no longer needed.
- Clean up stats service now that it is less coupled to the rest of the app.
- Refactor cancellation logic - cancellations now originate from session queue (i.e. HTTP cancel endpoint) and are emitted as events. Processor gets the events and sets the canceled event. Access to this event is provided to the invocation context for e.g. the step callback.
- Remove `sessions` router; it provided access to `graph_executions` but that no longer exists.
2024-02-20 09:54:01 +11:00

139 lines
5.7 KiB
Python

# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from logging import Logger
import torch
from invokeai.app.services.object_serializer.object_serializer_disk import ObjectSerializerDisk
from invokeai.app.services.object_serializer.object_serializer_forward_cache import ObjectSerializerForwardCache
from invokeai.app.services.shared.sqlite.sqlite_util import init_db
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningFieldData
from invokeai.backend.util.logging import InvokeAILogger
from invokeai.version.invokeai_version import __version__
from ..services.board_image_records.board_image_records_sqlite import SqliteBoardImageRecordStorage
from ..services.board_images.board_images_default import BoardImagesService
from ..services.board_records.board_records_sqlite import SqliteBoardRecordStorage
from ..services.boards.boards_default import BoardService
from ..services.config import InvokeAIAppConfig
from ..services.download import DownloadQueueService
from ..services.image_files.image_files_disk import DiskImageFileStorage
from ..services.image_records.image_records_sqlite import SqliteImageRecordStorage
from ..services.images.images_default import ImageService
from ..services.invocation_cache.invocation_cache_memory import MemoryInvocationCache
from ..services.invocation_services import InvocationServices
from ..services.invocation_stats.invocation_stats_default import InvocationStatsService
from ..services.invoker import Invoker
from ..services.model_manager.model_manager_default import ModelManagerService
from ..services.model_metadata import ModelMetadataStoreSQL
from ..services.model_records import ModelRecordServiceSQL
from ..services.names.names_default import SimpleNameService
from ..services.session_processor.session_processor_default import DefaultSessionProcessor
from ..services.session_queue.session_queue_sqlite import SqliteSessionQueue
from ..services.urls.urls_default import LocalUrlService
from ..services.workflow_records.workflow_records_sqlite import SqliteWorkflowRecordsStorage
from .events import FastAPIEventService
# TODO: is there a better way to achieve this?
def check_internet() -> bool:
"""
Return true if the internet is reachable.
It does this by pinging huggingface.co.
"""
import urllib.request
host = "http://huggingface.co"
try:
urllib.request.urlopen(host, timeout=1)
return True
except Exception:
return False
logger = InvokeAILogger.get_logger()
class ApiDependencies:
"""Contains and initializes all dependencies for the API"""
invoker: Invoker
@staticmethod
def initialize(config: InvokeAIAppConfig, event_handler_id: int, logger: Logger = logger) -> None:
logger.info(f"InvokeAI version {__version__}")
logger.info(f"Root directory = {str(config.root_path)}")
logger.debug(f"Internet connectivity is {config.internet_available}")
output_folder = config.output_path
if output_folder is None:
raise ValueError("Output folder is not set")
image_files = DiskImageFileStorage(f"{output_folder}/images")
db = init_db(config=config, logger=logger, image_files=image_files)
configuration = config
logger = logger
board_image_records = SqliteBoardImageRecordStorage(db=db)
board_images = BoardImagesService()
board_records = SqliteBoardRecordStorage(db=db)
boards = BoardService()
events = FastAPIEventService(event_handler_id)
image_records = SqliteImageRecordStorage(db=db)
images = ImageService()
invocation_cache = MemoryInvocationCache(max_cache_size=config.node_cache_size)
tensors = ObjectSerializerForwardCache(
ObjectSerializerDisk[torch.Tensor](output_folder / "tensors", ephemeral=True)
)
conditioning = ObjectSerializerForwardCache(
ObjectSerializerDisk[ConditioningFieldData](output_folder / "conditioning", ephemeral=True)
)
download_queue_service = DownloadQueueService(event_bus=events)
model_metadata_service = ModelMetadataStoreSQL(db=db)
model_manager = ModelManagerService.build_model_manager(
app_config=configuration,
model_record_service=ModelRecordServiceSQL(db=db, metadata_store=model_metadata_service),
download_queue=download_queue_service,
events=events,
)
names = SimpleNameService()
performance_statistics = InvocationStatsService()
session_processor = DefaultSessionProcessor()
session_queue = SqliteSessionQueue(db=db)
urls = LocalUrlService()
workflow_records = SqliteWorkflowRecordsStorage(db=db)
services = InvocationServices(
board_image_records=board_image_records,
board_images=board_images,
board_records=board_records,
boards=boards,
configuration=configuration,
events=events,
image_files=image_files,
image_records=image_records,
images=images,
invocation_cache=invocation_cache,
logger=logger,
model_manager=model_manager,
download_queue=download_queue_service,
names=names,
performance_statistics=performance_statistics,
session_processor=session_processor,
session_queue=session_queue,
urls=urls,
workflow_records=workflow_records,
tensors=tensors,
conditioning=conditioning,
)
ApiDependencies.invoker = Invoker(services)
db.clean()
@staticmethod
def shutdown() -> None:
if ApiDependencies.invoker:
ApiDependencies.invoker.stop()