From 9adb15f86c4ac6da1b3f6a41e37e9834b21d2286 Mon Sep 17 00:00:00 2001 From: Lincoln Stein Date: Mon, 1 Apr 2024 18:44:24 -0400 Subject: [PATCH] working but filled with debug statements --- invokeai/app/api/dependencies.py | 5 +++ invokeai/app/invocations/latent.py | 7 +++- .../object_serializer_forward_cache.py | 32 ++++++++++++----- .../session_processor_default.py | 2 -- .../load/model_cache/model_cache_default.py | 23 +++++-------- .../stable_diffusion/diffusers_pipeline.py | 10 ++++++ .../diffusion/shared_invokeai_diffusion.py | 3 ++ invokeai/backend/util/devices.py | 34 +++++++++---------- 8 files changed, 72 insertions(+), 44 deletions(-) diff --git a/invokeai/app/api/dependencies.py b/invokeai/app/api/dependencies.py index 7332b35c08..995d08106a 100644 --- a/invokeai/app/api/dependencies.py +++ b/invokeai/app/api/dependencies.py @@ -4,6 +4,8 @@ from logging import Logger import torch +import invokeai.backend.util.devices # horrible hack + from invokeai.app.services.object_serializer.object_serializer_disk import ObjectSerializerDisk from invokeai.app.services.object_serializer.object_serializer_forward_cache import ObjectSerializerForwardCache from invokeai.app.services.shared.sqlite.sqlite_util import init_db @@ -100,6 +102,9 @@ class ApiDependencies: download_queue=download_queue_service, events=events, ) + # horrible hack - remove + invokeai.backend.util.devices.RAM_CACHE = model_manager.load.ram_cache + names = SimpleNameService() session_processor = DefaultSessionProcessor() session_queue = SqliteSessionQueue(db=db) diff --git a/invokeai/app/invocations/latent.py b/invokeai/app/invocations/latent.py index 7845cbba03..9d73601d71 100644 --- a/invokeai/app/invocations/latent.py +++ b/invokeai/app/invocations/latent.py @@ -4,7 +4,7 @@ import math from contextlib import ExitStack from functools import singledispatchmethod from typing import Any, Iterator, List, Literal, Optional, Tuple, Union - +import threading import einops import numpy as np import numpy.typing as npt @@ -393,6 +393,11 @@ class DenoiseLatentsInvocation(BaseInvocation): # flip all bits to have noise different from initial generator=torch.Generator(device=unet.device).manual_seed(seed ^ 0xFFFFFFFF), ) + + if conditioning_data.unconditioned_embeddings.embeds.device != conditioning_data.text_embeddings.embeds.device: + print(f'DEBUG; ERROR uc={conditioning_data.unconditioned_embeddings.embeds.device} c={conditioning_data.text_embeddings.embeds.device} unet={unet.device}, tid={threading.current_thread().ident}') + + return conditioning_data def create_pipeline( diff --git a/invokeai/app/services/object_serializer/object_serializer_forward_cache.py b/invokeai/app/services/object_serializer/object_serializer_forward_cache.py index b361259a4b..7d04d47d5c 100644 --- a/invokeai/app/services/object_serializer/object_serializer_forward_cache.py +++ b/invokeai/app/services/object_serializer/object_serializer_forward_cache.py @@ -1,5 +1,6 @@ from queue import Queue from typing import TYPE_CHECKING, Optional, TypeVar +import threading from invokeai.app.services.object_serializer.object_serializer_base import ObjectSerializerBase @@ -18,8 +19,8 @@ class ObjectSerializerForwardCache(ObjectSerializerBase[T]): def __init__(self, underlying_storage: ObjectSerializerBase[T], max_cache_size: int = 20): super().__init__() self._underlying_storage = underlying_storage - self._cache: dict[str, T] = {} - self._cache_ids = Queue[str]() + self._cache: dict[int, dict[str, T]] = {} + self._cache_ids: dict[int, Queue[str]] = {} self._max_cache_size = max_cache_size def start(self, invoker: "Invoker") -> None: @@ -54,12 +55,27 @@ class ObjectSerializerForwardCache(ObjectSerializerBase[T]): del self._cache[name] self._on_deleted(name) + def _get_tid_cache(self) -> dict[str, T]: + tid = threading.current_thread().ident + if tid not in self._cache: + self._cache[tid] = {} + return self._cache[tid] + + def _get_tid_cache_ids(self) -> Queue[str]: + tid = threading.current_thread().ident + if tid not in self._cache_ids: + self._cache_ids[tid] = Queue[str]() + return self._cache_ids[tid] + def _get_cache(self, name: str) -> Optional[T]: - return None if name not in self._cache else self._cache[name] + cache = self._get_tid_cache() + return None if name not in cache else cache[name] def _set_cache(self, name: str, data: T): - if name not in self._cache: - self._cache[name] = data - self._cache_ids.put(name) - if self._cache_ids.qsize() > self._max_cache_size: - self._cache.pop(self._cache_ids.get()) + cache = self._get_tid_cache() + if name not in cache: + cache[name] = data + cache_ids = self._get_tid_cache_ids() + cache_ids.put(name) + if cache_ids.qsize() > self._max_cache_size: + cache.pop(cache_ids.get()) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index fd198d0ff9..b1f3203fe0 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -175,7 +175,6 @@ class DefaultSessionProcessor(SessionProcessorBase): session = self._session_worker_queue.get() if self._cancel_event.is_set(): if session.item_id in self._sessions_to_cancel: - print("DEBUG: CANCEL") continue if profiler is not None: @@ -183,7 +182,6 @@ class DefaultSessionProcessor(SessionProcessorBase): # reserve a GPU for this session - may block with self._invoker.services.model_manager.load.ram_cache.reserve_execution_device() as gpu: - print(f"DEBUG: session {session.item_id} has reserved gpu {gpu}") # Prepare invocations and take the first with self._process_lock: diff --git a/invokeai/backend/model_manager/load/model_cache/model_cache_default.py b/invokeai/backend/model_manager/load/model_cache/model_cache_default.py index 04cac01092..a6b7f9524d 100644 --- a/invokeai/backend/model_manager/load/model_cache/model_cache_default.py +++ b/invokeai/backend/model_manager/load/model_cache/model_cache_default.py @@ -30,15 +30,11 @@ import torch from invokeai.backend.model_manager import AnyModel, SubModelType from invokeai.backend.model_manager.load.memory_snapshot import MemorySnapshot -from invokeai.backend.util.devices import choose_torch_device from invokeai.backend.util.logging import InvokeAILogger from .model_cache_base import CacheRecord, CacheStats, ModelCacheBase, ModelLockerBase from .model_locker import ModelLocker -if choose_torch_device() == torch.device("mps"): - from torch import mps - # Maximum size of the cache, in gigs # Default is roughly enough to hold three fp16 diffusers models in RAM simultaneously DEFAULT_MAX_CACHE_SIZE = 6.0 @@ -130,6 +126,7 @@ class ModelCache(ModelCacheBase[AnyModel]): assigned = [x for x, tid in self._execution_devices.items() if current_thread == tid] if not assigned: raise ValueError("No GPU has been reserved for the use of thread {current_thread}") + print(f'DEBUG: TID={current_thread}; owns {assigned[0]}') return assigned[0] @contextmanager @@ -155,15 +152,16 @@ class ModelCache(ModelCacheBase[AnyModel]): self._free_execution_device.acquire(timeout=timeout) with self._device_lock: free_device = [x for x, tid in self._execution_devices.items() if tid == 0] - print(f"DEBUG: execution devices = {self._execution_devices}") self._execution_devices[free_device[0]] = current_thread device = free_device[0] # we are outside the lock region now + print(f'DEBUG: RESERVED {device} for TID {current_thread}') try: yield device finally: with self._device_lock: + print(f'DEBUG: RELEASED {device} for TID {current_thread}') self._execution_devices[device] = 0 self._free_execution_device.release() torch.cuda.empty_cache() @@ -386,11 +384,6 @@ class ModelCache(ModelCacheBase[AnyModel]): if self.stats: self.stats.cleared = models_cleared gc.collect() - - torch.cuda.empty_cache() - if choose_torch_device() == torch.device("mps"): - mps.empty_cache() - self.logger.debug(f"After making room: cached_models={len(self._cached_models)}") def _check_free_vram(self, target_device: torch.device, needed_size: int) -> None: @@ -406,12 +399,12 @@ class ModelCache(ModelCacheBase[AnyModel]): @staticmethod def _get_execution_devices(devices: Optional[Set[torch.device]] = None) -> Set[torch.device]: if not devices: - default_device = choose_torch_device() - if default_device != torch.device("cuda"): - devices = {default_device} - else: - # we get here if the default device is cuda, and return each of the cuda devices. + if torch.cuda.is_available(): devices = {torch.device(f"cuda:{x}") for x in range(0, torch.cuda.device_count())} + elif torch.backends.mps.is_available(): + devices = {torch.device('mps')} + else: + devices = {torch.device('cpu')} return devices @staticmethod diff --git a/invokeai/backend/stable_diffusion/diffusers_pipeline.py b/invokeai/backend/stable_diffusion/diffusers_pipeline.py index dae55a0751..34e40daa1c 100644 --- a/invokeai/backend/stable_diffusion/diffusers_pipeline.py +++ b/invokeai/backend/stable_diffusion/diffusers_pipeline.py @@ -414,6 +414,11 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline): else: attn_ctx = nullcontext() + # NOTE error is not here! + if conditioning_data.unconditioned_embeddings.embeds.device != \ + conditioning_data.text_embeddings.embeds.device: + print('DEBUG; HERE IS THE ERROR 1') + with attn_ctx: if callback is not None: callback( @@ -428,6 +433,10 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline): # print("timesteps:", timesteps) for i, t in enumerate(self.progress_bar(timesteps)): + if conditioning_data.unconditioned_embeddings.embeds.device != \ + conditioning_data.text_embeddings.embeds.device: + print('DEBUG; HERE IS THE ERROR 2') + batched_t = t.expand(batch_size) step_output = self.step( batched_t, @@ -472,6 +481,7 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline): t2i_adapter_data: Optional[list[T2IAdapterData]] = None, ip_adapter_unet_patcher: Optional[UNetPatcher] = None, ): + # invokeai_diffuser has batched timesteps, but diffusers schedulers expect a single value timestep = t[0] if additional_guidance is None: diff --git a/invokeai/backend/stable_diffusion/diffusion/shared_invokeai_diffusion.py b/invokeai/backend/stable_diffusion/diffusion/shared_invokeai_diffusion.py index f55876623c..f4ca8d5418 100644 --- a/invokeai/backend/stable_diffusion/diffusion/shared_invokeai_diffusion.py +++ b/invokeai/backend/stable_diffusion/diffusion/shared_invokeai_diffusion.py @@ -5,6 +5,7 @@ from contextlib import contextmanager from typing import Any, Callable, Optional, Union import torch +import threading from diffusers import UNet2DConditionModel from typing_extensions import TypeAlias @@ -288,6 +289,8 @@ class InvokeAIDiffuserComponent: unconditioning, encoder_attention_mask = _pad_conditioning(unconditioning, max_len, encoder_attention_mask) conditioning, encoder_attention_mask = _pad_conditioning(conditioning, max_len, encoder_attention_mask) + if unconditioning.device != conditioning.device: + print(f'DEBUG: TID={threading.current_thread().ident}: Unconditioning device = {unconditioning.device}, conditioning device={conditioning.device}') return torch.cat([unconditioning, conditioning]), encoder_attention_mask # methods below are called from do_diffusion_step and should be considered private to this class. diff --git a/invokeai/backend/util/devices.py b/invokeai/backend/util/devices.py index e0fd0b1c9e..5aa8130689 100644 --- a/invokeai/backend/util/devices.py +++ b/invokeai/backend/util/devices.py @@ -12,28 +12,26 @@ from invokeai.app.services.config.config_default import get_config CPU_DEVICE = torch.device("cpu") CUDA_DEVICE = torch.device("cuda") MPS_DEVICE = torch.device("mps") - +RAM_CACHE = None # horrible hack def choose_torch_device() -> torch.device: """Convenience routine for guessing which GPU device to run model on.""" - # """Temporarily modified to use the model manager's get_execution_device()""" - # try: - # from invokeai.app.api.dependencies import ApiDependencies - # model_manager = ApiDependencies.invoker.services.model_manager - # device = model_manager.load.ram_cache.acquire_execution_device() - # print(f'DEBUG choose_torch_device returning {device}') - # return device - # except Exception: - config = get_config() - if config.device == "auto": - if torch.cuda.is_available(): - return torch.device("cuda") - if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): - return torch.device("mps") + """Temporarily modified to use the model manager's get_execution_device()""" + global RAM_CACHE + try: + device = RAM_CACHE.get_execution_device() + return device + except (ValueError, AttributeError): + config = get_config() + if config.device == "auto": + if torch.cuda.is_available(): + return torch.device("cuda") + if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): + return torch.device("mps") + else: + return CPU_DEVICE else: - return CPU_DEVICE - else: - return torch.device(config.device) + return torch.device(config.device) def get_torch_device_name() -> str: