diff --git a/invokeai/app/services/invocation_stats.py b/invokeai/app/services/invocation_stats.py index 9d50375c09..484544e40d 100644 --- a/invokeai/app/services/invocation_stats.py +++ b/invokeai/app/services/invocation_stats.py @@ -43,6 +43,11 @@ import invokeai.backend.util.logging as logger from ..invocations.baseinvocation import BaseInvocation from .graph import GraphExecutionState from .item_storage import ItemStorageABC +from .model_manager_service import ModelManagerService +from invokeai.backend.model_management.model_cache import CacheStats + +# size of GIG in bytes +GIG = 1073741824 class InvocationStatsServiceBase(ABC): @@ -84,14 +89,15 @@ class InvocationStatsServiceBase(ABC): pass @abstractmethod - def update_invocation_stats(self, - graph_id: str, - invocation_type: str, - time_used: float, - vram_used: float, - ram_used: float, - ram_changed: float, - ): + def update_invocation_stats( + self, + graph_id: str, + invocation_type: str, + time_used: float, + vram_used: float, + ram_used: float, + ram_changed: float, + ): """ Add timing information on execution of a node. Usually used internally. @@ -119,6 +125,9 @@ class NodeStats: calls: int = 0 time_used: float = 0.0 # seconds max_vram: float = 0.0 # GB + cache_hits: int = 0 + cache_misses: int = 0 + cache_high_watermark: int = 0 @dataclass @@ -137,36 +146,50 @@ class InvocationStatsService(InvocationStatsServiceBase): self.graph_execution_manager = graph_execution_manager # {graph_id => NodeLog} self._stats: Dict[str, NodeLog] = {} + self._cache_stats: Dict[str, CacheStats] = {} class StatsContext: - def __init__(self, invocation: BaseInvocation, graph_id: str, collector: "InvocationStatsServiceBase"): + """Context manager for collecting statistics.""" + + def __init__( + self, + invocation: BaseInvocation, + graph_id: str, + model_manager: ModelManagerService, + collector: "InvocationStatsServiceBase", + ): + """Initialize statistics for this run.""" self.invocation = invocation self.collector = collector self.graph_id = graph_id self.start_time = 0 - self.ram_info = None + self.ram_used = 0 + self.model_manager = model_manager def __enter__(self): self.start_time = time.time() if torch.cuda.is_available(): torch.cuda.reset_peak_memory_stats() - self.ram_info = psutil.virtual_memory() - + self.ram_used = psutil.Process().memory_info().rss + self.model_manager.collect_cache_stats(self.collector._cache_stats[self.graph_id]) def __exit__(self, *args): + """Called on exit from the context.""" + ram_used = psutil.Process().memory_info().rss self.collector.update_invocation_stats( - graph_id = self.graph_id, - invocation_type = self.invocation.type, - time_used = time.time() - self.start_time, - vram_used = torch.cuda.max_memory_allocated() / 1e9 if torch.cuda.is_available() else 0.0, - ram_used = psutil.virtual_memory().used / 1e9, - ram_changed = (psutil.virtual_memory().used - self.ram_info.used) / 1e9, + graph_id=self.graph_id, + invocation_type=self.invocation.type, + time_used=time.time() - self.start_time, + vram_used=torch.cuda.max_memory_allocated() / GIG if torch.cuda.is_available() else 0.0, + ram_used=ram_used / GIG, + ram_changed=(ram_used - self.ram_used) / GIG, ) def collect_stats( self, invocation: BaseInvocation, graph_execution_state_id: str, + model_manager: ModelManagerService, ) -> StatsContext: """ Return a context object that will capture the statistics. @@ -175,7 +198,8 @@ class InvocationStatsService(InvocationStatsServiceBase): """ if not self._stats.get(graph_execution_state_id): # first time we're seeing this self._stats[graph_execution_state_id] = NodeLog() - return self.StatsContext(invocation, graph_execution_state_id, self) + self._cache_stats[graph_execution_state_id] = CacheStats() + return self.StatsContext(invocation, graph_execution_state_id, model_manager, self) def reset_all_stats(self): """Zero all statistics""" @@ -188,14 +212,15 @@ class InvocationStatsService(InvocationStatsServiceBase): except KeyError: logger.warning(f"Attempted to clear statistics for unknown graph {graph_execution_id}") - def update_invocation_stats(self, - graph_id: str, - invocation_type: str, - time_used: float, - vram_used: float, - ram_used: float, - ram_changed: float, - ): + def update_invocation_stats( + self, + graph_id: str, + invocation_type: str, + time_used: float, + vram_used: float, + ram_used: float, + ram_changed: float, + ): """ Add timing information on execution of a node. Usually used internally. @@ -218,7 +243,7 @@ class InvocationStatsService(InvocationStatsServiceBase): def log_stats(self): """ Send the statistics to the system logger at the info level. - Stats will only be printed if when the execution of the graph + Stats will only be printed when the execution of the graph is complete. """ completed = set() @@ -235,11 +260,21 @@ class InvocationStatsService(InvocationStatsServiceBase): total_time += stats.time_used logger.info(f"TOTAL GRAPH EXECUTION TIME: {total_time:7.3f}s") - logger.info("Current RAM used: " + "%4.2fG" % stats.ram_used + f" (delta={stats.ram_changed:4.2f}G)") + logger.info("RAM used: " + "%4.2fG" % stats.ram_used + f" (delta={stats.ram_changed:4.2f}G)") if torch.cuda.is_available(): - logger.info("Current VRAM used: " + "%4.2fG" % (torch.cuda.memory_allocated() / 1e9)) + logger.info("VRAM used (all processes): " + "%4.2fG" % (torch.cuda.memory_allocated() / GIG)) + cache_stats = self._cache_stats[graph_id] + logger.info("RAM cache statistics:") + logger.info(f" Model cache hits: {cache_stats.hits}") + logger.info(f" Model cache misses: {cache_stats.misses}") + logger.info(f" Models cached: {cache_stats.in_cache}") + logger.info(f" Models cleared from cache: {cache_stats.cleared}") + hwm = cache_stats.high_watermark / GIG + tot = cache_stats.cache_size / GIG + logger.info(f" Cache RAM usage: {hwm:4.2f}/{tot:4.2f}G") completed.add(graph_id) for graph_id in completed: del self._stats[graph_id] + del self._cache_stats[graph_id] diff --git a/invokeai/app/services/model_manager_service.py b/invokeai/app/services/model_manager_service.py index fd14e26364..675bc71257 100644 --- a/invokeai/app/services/model_manager_service.py +++ b/invokeai/app/services/model_manager_service.py @@ -22,6 +22,7 @@ from invokeai.backend.model_management import ( ModelNotFoundException, ) from invokeai.backend.model_management.model_search import FindModels +from invokeai.backend.model_management.model_cache import CacheStats import torch from invokeai.app.models.exceptions import CanceledException @@ -276,6 +277,13 @@ class ModelManagerServiceBase(ABC): """ pass + @abstractmethod + def collect_cache_stats(self, cache_stats: CacheStats): + """ + Reset model cache statistics for graph with graph_id. + """ + pass + @abstractmethod def commit(self, conf_file: Optional[Path] = None) -> None: """ @@ -500,6 +508,12 @@ class ModelManagerService(ModelManagerServiceBase): self.logger.debug(f"convert model {model_name}") return self.mgr.convert_model(model_name, base_model, model_type, convert_dest_directory) + def collect_cache_stats(self, cache_stats: CacheStats): + """ + Reset model cache statistics for graph with graph_id. + """ + self.mgr.cache.stats = cache_stats + def commit(self, conf_file: Optional[Path] = None): """ Write current configuration out to the indicated file. diff --git a/invokeai/app/services/processor.py b/invokeai/app/services/processor.py index 41170a304b..6f35b3cb9e 100644 --- a/invokeai/app/services/processor.py +++ b/invokeai/app/services/processor.py @@ -86,7 +86,9 @@ class DefaultInvocationProcessor(InvocationProcessorABC): # Invoke try: - with statistics.collect_stats(invocation, graph_execution_state.id): + graph_id = graph_execution_state.id + model_manager = self.__invoker.services.model_manager + with statistics.collect_stats(invocation, graph_id, model_manager): outputs = invocation.invoke( InvocationContext( services=self.__invoker.services, diff --git a/invokeai/backend/model_management/model_cache.py b/invokeai/backend/model_management/model_cache.py index 2b8d020269..a610694d0e 100644 --- a/invokeai/backend/model_management/model_cache.py +++ b/invokeai/backend/model_management/model_cache.py @@ -21,12 +21,12 @@ import os import sys import hashlib from contextlib import suppress +from dataclasses import dataclass from pathlib import Path from typing import Dict, Union, types, Optional, Type, Any import torch -import logging import invokeai.backend.util.logging as logger from .models import BaseModelType, ModelType, SubModelType, ModelBase @@ -41,6 +41,16 @@ DEFAULT_MAX_VRAM_CACHE_SIZE = 2.75 GIG = 1073741824 +@dataclass +class CacheStats(object): + hits: int = 0 + misses: int = 0 + high_watermark: int = 0 + in_cache: int = 0 + cleared: int = 0 + cache_size: int = 0 + + class ModelLocker(object): "Forward declaration" pass @@ -115,6 +125,9 @@ class ModelCache(object): self.sha_chunksize = sha_chunksize self.logger = logger + # used for stats collection + self.stats = None + self._cached_models = dict() self._cache_stack = list() @@ -188,6 +201,8 @@ class ModelCache(object): self.logger.info( f"Loading model {model_path}, type {base_model.value}:{model_type.value}{':'+submodel.value if submodel else ''}" ) + if self.stats: + self.stats.misses += 1 # this will remove older cached models until # there is sufficient room to load the requested model @@ -201,6 +216,14 @@ class ModelCache(object): cache_entry = _CacheRecord(self, model, mem_used) self._cached_models[key] = cache_entry + else: + if self.stats: + self.stats.hits += 1 + self.stats.cache_size = self.max_cache_size * GIG + + if self.stats: + self.stats.high_watermark = max(self.stats.high_watermark, self._cache_size()) + self.stats.in_cache = len(self._cached_models) with suppress(Exception): self._cache_stack.remove(key) @@ -280,14 +303,14 @@ class ModelCache(object): """ Given the HF repo id or path to a model on disk, returns a unique hash. Works for legacy checkpoint files, HF models on disk, and HF repo IDs + :param model_path: Path to model file/directory on disk. """ return self._local_model_hash(model_path) def cache_size(self) -> float: - "Return the current size of the cache, in GB" - current_cache_size = sum([m.size for m in self._cached_models.values()]) - return current_cache_size / GIG + """Return the current size of the cache, in GB.""" + return self._cache_size() / GIG def _has_cuda(self) -> bool: return self.execution_device.type == "cuda" @@ -310,12 +333,15 @@ class ModelCache(object): f"Current VRAM/RAM usage: {vram}/{ram}; cached_models/loaded_models/locked_models/ = {cached_models}/{loaded_models}/{locked_models}" ) + def _cache_size(self) -> int: + return sum([m.size for m in self._cached_models.values()]) + def _make_cache_room(self, model_size): # calculate how much memory this model will require # multiplier = 2 if self.precision==torch.float32 else 1 bytes_needed = model_size maximum_size = self.max_cache_size * GIG # stored in GB, convert to bytes - current_size = sum([m.size for m in self._cached_models.values()]) + current_size = self._cache_size() if current_size + bytes_needed > maximum_size: self.logger.debug( @@ -364,6 +390,8 @@ class ModelCache(object): f"Unloading model {model_key} to free {(model_size/GIG):.2f} GB (-{(cache_entry.size/GIG):.2f} GB)" ) current_size -= cache_entry.size + if self.stats: + self.stats.cleared += 1 del self._cache_stack[pos] del self._cached_models[model_key] del cache_entry