working but filled with debug statements

This commit is contained in:
Lincoln Stein 2024-04-01 18:44:24 -04:00
parent 3d69372785
commit 9adb15f86c
8 changed files with 72 additions and 44 deletions

View File

@ -4,6 +4,8 @@ from logging import Logger
import torch
import invokeai.backend.util.devices # horrible hack
from invokeai.app.services.object_serializer.object_serializer_disk import ObjectSerializerDisk
from invokeai.app.services.object_serializer.object_serializer_forward_cache import ObjectSerializerForwardCache
from invokeai.app.services.shared.sqlite.sqlite_util import init_db
@ -100,6 +102,9 @@ class ApiDependencies:
download_queue=download_queue_service,
events=events,
)
# horrible hack - remove
invokeai.backend.util.devices.RAM_CACHE = model_manager.load.ram_cache
names = SimpleNameService()
session_processor = DefaultSessionProcessor()
session_queue = SqliteSessionQueue(db=db)

View File

@ -4,7 +4,7 @@ import math
from contextlib import ExitStack
from functools import singledispatchmethod
from typing import Any, Iterator, List, Literal, Optional, Tuple, Union
import threading
import einops
import numpy as np
import numpy.typing as npt
@ -393,6 +393,11 @@ class DenoiseLatentsInvocation(BaseInvocation):
# flip all bits to have noise different from initial
generator=torch.Generator(device=unet.device).manual_seed(seed ^ 0xFFFFFFFF),
)
if conditioning_data.unconditioned_embeddings.embeds.device != conditioning_data.text_embeddings.embeds.device:
print(f'DEBUG; ERROR uc={conditioning_data.unconditioned_embeddings.embeds.device} c={conditioning_data.text_embeddings.embeds.device} unet={unet.device}, tid={threading.current_thread().ident}')
return conditioning_data
def create_pipeline(

View File

@ -1,5 +1,6 @@
from queue import Queue
from typing import TYPE_CHECKING, Optional, TypeVar
import threading
from invokeai.app.services.object_serializer.object_serializer_base import ObjectSerializerBase
@ -18,8 +19,8 @@ class ObjectSerializerForwardCache(ObjectSerializerBase[T]):
def __init__(self, underlying_storage: ObjectSerializerBase[T], max_cache_size: int = 20):
super().__init__()
self._underlying_storage = underlying_storage
self._cache: dict[str, T] = {}
self._cache_ids = Queue[str]()
self._cache: dict[int, dict[str, T]] = {}
self._cache_ids: dict[int, Queue[str]] = {}
self._max_cache_size = max_cache_size
def start(self, invoker: "Invoker") -> None:
@ -54,12 +55,27 @@ class ObjectSerializerForwardCache(ObjectSerializerBase[T]):
del self._cache[name]
self._on_deleted(name)
def _get_tid_cache(self) -> dict[str, T]:
tid = threading.current_thread().ident
if tid not in self._cache:
self._cache[tid] = {}
return self._cache[tid]
def _get_tid_cache_ids(self) -> Queue[str]:
tid = threading.current_thread().ident
if tid not in self._cache_ids:
self._cache_ids[tid] = Queue[str]()
return self._cache_ids[tid]
def _get_cache(self, name: str) -> Optional[T]:
return None if name not in self._cache else self._cache[name]
cache = self._get_tid_cache()
return None if name not in cache else cache[name]
def _set_cache(self, name: str, data: T):
if name not in self._cache:
self._cache[name] = data
self._cache_ids.put(name)
if self._cache_ids.qsize() > self._max_cache_size:
self._cache.pop(self._cache_ids.get())
cache = self._get_tid_cache()
if name not in cache:
cache[name] = data
cache_ids = self._get_tid_cache_ids()
cache_ids.put(name)
if cache_ids.qsize() > self._max_cache_size:
cache.pop(cache_ids.get())

View File

@ -175,7 +175,6 @@ class DefaultSessionProcessor(SessionProcessorBase):
session = self._session_worker_queue.get()
if self._cancel_event.is_set():
if session.item_id in self._sessions_to_cancel:
print("DEBUG: CANCEL")
continue
if profiler is not None:
@ -183,7 +182,6 @@ class DefaultSessionProcessor(SessionProcessorBase):
# reserve a GPU for this session - may block
with self._invoker.services.model_manager.load.ram_cache.reserve_execution_device() as gpu:
print(f"DEBUG: session {session.item_id} has reserved gpu {gpu}")
# Prepare invocations and take the first
with self._process_lock:

View File

@ -30,15 +30,11 @@ import torch
from invokeai.backend.model_manager import AnyModel, SubModelType
from invokeai.backend.model_manager.load.memory_snapshot import MemorySnapshot
from invokeai.backend.util.devices import choose_torch_device
from invokeai.backend.util.logging import InvokeAILogger
from .model_cache_base import CacheRecord, CacheStats, ModelCacheBase, ModelLockerBase
from .model_locker import ModelLocker
if choose_torch_device() == torch.device("mps"):
from torch import mps
# Maximum size of the cache, in gigs
# Default is roughly enough to hold three fp16 diffusers models in RAM simultaneously
DEFAULT_MAX_CACHE_SIZE = 6.0
@ -130,6 +126,7 @@ class ModelCache(ModelCacheBase[AnyModel]):
assigned = [x for x, tid in self._execution_devices.items() if current_thread == tid]
if not assigned:
raise ValueError("No GPU has been reserved for the use of thread {current_thread}")
print(f'DEBUG: TID={current_thread}; owns {assigned[0]}')
return assigned[0]
@contextmanager
@ -155,15 +152,16 @@ class ModelCache(ModelCacheBase[AnyModel]):
self._free_execution_device.acquire(timeout=timeout)
with self._device_lock:
free_device = [x for x, tid in self._execution_devices.items() if tid == 0]
print(f"DEBUG: execution devices = {self._execution_devices}")
self._execution_devices[free_device[0]] = current_thread
device = free_device[0]
# we are outside the lock region now
print(f'DEBUG: RESERVED {device} for TID {current_thread}')
try:
yield device
finally:
with self._device_lock:
print(f'DEBUG: RELEASED {device} for TID {current_thread}')
self._execution_devices[device] = 0
self._free_execution_device.release()
torch.cuda.empty_cache()
@ -386,11 +384,6 @@ class ModelCache(ModelCacheBase[AnyModel]):
if self.stats:
self.stats.cleared = models_cleared
gc.collect()
torch.cuda.empty_cache()
if choose_torch_device() == torch.device("mps"):
mps.empty_cache()
self.logger.debug(f"After making room: cached_models={len(self._cached_models)}")
def _check_free_vram(self, target_device: torch.device, needed_size: int) -> None:
@ -406,12 +399,12 @@ class ModelCache(ModelCacheBase[AnyModel]):
@staticmethod
def _get_execution_devices(devices: Optional[Set[torch.device]] = None) -> Set[torch.device]:
if not devices:
default_device = choose_torch_device()
if default_device != torch.device("cuda"):
devices = {default_device}
else:
# we get here if the default device is cuda, and return each of the cuda devices.
if torch.cuda.is_available():
devices = {torch.device(f"cuda:{x}") for x in range(0, torch.cuda.device_count())}
elif torch.backends.mps.is_available():
devices = {torch.device('mps')}
else:
devices = {torch.device('cpu')}
return devices
@staticmethod

View File

@ -414,6 +414,11 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline):
else:
attn_ctx = nullcontext()
# NOTE error is not here!
if conditioning_data.unconditioned_embeddings.embeds.device != \
conditioning_data.text_embeddings.embeds.device:
print('DEBUG; HERE IS THE ERROR 1')
with attn_ctx:
if callback is not None:
callback(
@ -428,6 +433,10 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline):
# print("timesteps:", timesteps)
for i, t in enumerate(self.progress_bar(timesteps)):
if conditioning_data.unconditioned_embeddings.embeds.device != \
conditioning_data.text_embeddings.embeds.device:
print('DEBUG; HERE IS THE ERROR 2')
batched_t = t.expand(batch_size)
step_output = self.step(
batched_t,
@ -472,6 +481,7 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline):
t2i_adapter_data: Optional[list[T2IAdapterData]] = None,
ip_adapter_unet_patcher: Optional[UNetPatcher] = None,
):
# invokeai_diffuser has batched timesteps, but diffusers schedulers expect a single value
timestep = t[0]
if additional_guidance is None:

View File

@ -5,6 +5,7 @@ from contextlib import contextmanager
from typing import Any, Callable, Optional, Union
import torch
import threading
from diffusers import UNet2DConditionModel
from typing_extensions import TypeAlias
@ -288,6 +289,8 @@ class InvokeAIDiffuserComponent:
unconditioning, encoder_attention_mask = _pad_conditioning(unconditioning, max_len, encoder_attention_mask)
conditioning, encoder_attention_mask = _pad_conditioning(conditioning, max_len, encoder_attention_mask)
if unconditioning.device != conditioning.device:
print(f'DEBUG: TID={threading.current_thread().ident}: Unconditioning device = {unconditioning.device}, conditioning device={conditioning.device}')
return torch.cat([unconditioning, conditioning]), encoder_attention_mask
# methods below are called from do_diffusion_step and should be considered private to this class.

View File

@ -12,28 +12,26 @@ from invokeai.app.services.config.config_default import get_config
CPU_DEVICE = torch.device("cpu")
CUDA_DEVICE = torch.device("cuda")
MPS_DEVICE = torch.device("mps")
RAM_CACHE = None # horrible hack
def choose_torch_device() -> torch.device:
"""Convenience routine for guessing which GPU device to run model on."""
# """Temporarily modified to use the model manager's get_execution_device()"""
# try:
# from invokeai.app.api.dependencies import ApiDependencies
# model_manager = ApiDependencies.invoker.services.model_manager
# device = model_manager.load.ram_cache.acquire_execution_device()
# print(f'DEBUG choose_torch_device returning {device}')
# return device
# except Exception:
config = get_config()
if config.device == "auto":
if torch.cuda.is_available():
return torch.device("cuda")
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return torch.device("mps")
"""Temporarily modified to use the model manager's get_execution_device()"""
global RAM_CACHE
try:
device = RAM_CACHE.get_execution_device()
return device
except (ValueError, AttributeError):
config = get_config()
if config.device == "auto":
if torch.cuda.is_available():
return torch.device("cuda")
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return torch.device("mps")
else:
return CPU_DEVICE
else:
return CPU_DEVICE
else:
return torch.device(config.device)
return torch.device(config.device)
def get_torch_device_name() -> str: