From fb9b7fb63ad5aee1da347a1d3ce0793551234f6b Mon Sep 17 00:00:00 2001 From: Lincoln Stein Date: Tue, 16 Apr 2024 15:23:49 -0400 Subject: [PATCH] make object_serializer._new_name() thread-safe; add max_threads config --- invokeai/app/services/config/config_default.py | 4 ++-- .../object_serializer/object_serializer_disk.py | 6 ++++-- .../session_processor/session_processor_default.py | 5 ++++- invokeai/backend/util/devices.py | 12 +++++++++++- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/invokeai/app/services/config/config_default.py b/invokeai/app/services/config/config_default.py index 39dc1fe83b..895008b753 100644 --- a/invokeai/app/services/config/config_default.py +++ b/invokeai/app/services/config/config_default.py @@ -110,7 +110,7 @@ class InvokeAIAppConfig(BaseSettings): force_tiled_decode: Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty). pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting. max_queue_size: Maximum number of items in the session queue. - max_threads: Maximum number of session queue execution threads. + max_threads: Maximum number of session queue execution threads. Autocalculated from number of GPUs if not set. allow_nodes: List of nodes to allow. Omit to allow all. deny_nodes: List of nodes to deny. Omit to deny none. node_cache_size: How many cached nodes to keep in memory. @@ -182,7 +182,7 @@ class InvokeAIAppConfig(BaseSettings): force_tiled_decode: bool = Field(default=False, description="Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty).") pil_compress_level: int = Field(default=1, description="The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.") max_queue_size: int = Field(default=10000, gt=0, description="Maximum number of items in the session queue.") - max_threads: int = Field(default=4, description="Maximum number of session queue execution threads.") + max_threads: Optional[int] = Field(default=None, description="Maximum number of session queue execution threads. Autocalculated from number of GPUs if not set.") # NODES allow_nodes: Optional[list[str]] = Field(default=None, description="List of nodes to allow. Omit to allow all.") diff --git a/invokeai/app/services/object_serializer/object_serializer_disk.py b/invokeai/app/services/object_serializer/object_serializer_disk.py index 7e28320ff2..354a9b0c04 100644 --- a/invokeai/app/services/object_serializer/object_serializer_disk.py +++ b/invokeai/app/services/object_serializer/object_serializer_disk.py @@ -1,5 +1,5 @@ -import threading import tempfile +import threading import typing from dataclasses import dataclass from pathlib import Path @@ -72,7 +72,9 @@ class ObjectSerializerDisk(ObjectSerializerBase[T]): def _new_name(self) -> str: tid = threading.current_thread().ident - return f"{self._obj_class_name}_{tid}_{uuid_string()}" + # Add tid to the object name because uuid4 not thread-safe on windows + # See https://stackoverflow.com/questions/2759644/python-multiprocessing-doesnt-play-nicely-with-uuid-uuid4 + return f"{self._obj_class_name}_{tid}-{uuid_string()}" def _tempdir_cleanup(self) -> None: """Calls `cleanup` on the temporary directory, if it exists.""" diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index eb00caba5b..1860e4a5b3 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -16,6 +16,7 @@ from invokeai.app.services.session_processor.session_processor_common import Can from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem from invokeai.app.services.shared.invocation_context import InvocationContextData, build_invocation_context from invokeai.app.util.profiler import Profiler +from invokeai.backend.util.devices import TorchDevice from ..invoker import Invoker from .session_processor_base import SessionProcessorBase @@ -40,7 +41,9 @@ class DefaultSessionProcessor(SessionProcessorBase): self._thread_semaphore = BoundedSemaphore(self._thread_limit) self._polling_interval = polling_interval - self._worker_thread_count = self._invoker.services.configuration.max_threads + self._worker_thread_count = self._invoker.services.configuration.max_threads or len( + TorchDevice.execution_devices() + ) self._session_worker_queue: Queue[SessionQueueItem] = Queue() self._process_lock = Lock() diff --git a/invokeai/backend/util/devices.py b/invokeai/backend/util/devices.py index b8cdec2ac3..b88206c5f7 100644 --- a/invokeai/backend/util/devices.py +++ b/invokeai/backend/util/devices.py @@ -1,4 +1,6 @@ -from typing import TYPE_CHECKING, Dict, Literal, Optional, Union +"""Torch Device class provides torch device selection services.""" + +from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union import torch from deprecated import deprecated @@ -69,6 +71,14 @@ class TorchDevice: device = CPU_DEVICE return cls.normalize(device) + @classmethod + def execution_devices(cls) -> List[torch.device]: + """Return a list of torch.devices that can be used for accelerated inference.""" + if cls._model_cache: + return cls._model_cache.execution_devices + else: + return [cls.choose_torch_device] + @classmethod def choose_torch_dtype(cls, device: Optional[torch.device] = None) -> torch.dtype: """Return the precision to use for accelerated inference."""