mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
|
|
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from typing import Dict, Union, Optional
|
|
|
|
import torch
|
|
|
|
|
|
class LatentsStorageBase(ABC):
|
|
"""Responsible for storing and retrieving latents."""
|
|
|
|
@abstractmethod
|
|
def get(self, name: str) -> torch.Tensor:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def save(self, name: str, data: torch.Tensor) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def delete(self, name: str) -> None:
|
|
pass
|
|
|
|
|
|
class ForwardCacheLatentsStorage(LatentsStorageBase):
|
|
"""Caches the latest N latents in memory, writing-thorugh to and reading from underlying storage"""
|
|
|
|
__cache: Dict[str, torch.Tensor]
|
|
__cache_ids: Queue
|
|
__max_cache_size: int
|
|
__underlying_storage: LatentsStorageBase
|
|
|
|
def __init__(self, underlying_storage: LatentsStorageBase, max_cache_size: int = 20):
|
|
self.__underlying_storage = underlying_storage
|
|
self.__cache = dict()
|
|
self.__cache_ids = Queue()
|
|
self.__max_cache_size = max_cache_size
|
|
|
|
def get(self, name: str) -> torch.Tensor:
|
|
cache_item = self.__get_cache(name)
|
|
if cache_item is not None:
|
|
return cache_item
|
|
|
|
latent = self.__underlying_storage.get(name)
|
|
self.__set_cache(name, latent)
|
|
return latent
|
|
|
|
def save(self, name: str, data: torch.Tensor) -> None:
|
|
self.__underlying_storage.save(name, data)
|
|
self.__set_cache(name, data)
|
|
|
|
def delete(self, name: str) -> None:
|
|
self.__underlying_storage.delete(name)
|
|
if name in self.__cache:
|
|
del self.__cache[name]
|
|
|
|
def __get_cache(self, name: str) -> Optional[torch.Tensor]:
|
|
return None if name not in self.__cache else self.__cache[name]
|
|
|
|
def __set_cache(self, name: str, data: torch.Tensor):
|
|
if not name in self.__cache:
|
|
self.__cache[name] = data
|
|
self.__cache_ids.put(name)
|
|
if self.__cache_ids.qsize() > self.__max_cache_size:
|
|
self.__cache.pop(self.__cache_ids.get())
|
|
|
|
|
|
class DiskLatentsStorage(LatentsStorageBase):
|
|
"""Stores latents in a folder on disk without caching"""
|
|
|
|
__output_folder: Union[str, Path]
|
|
|
|
def __init__(self, output_folder: Union[str, Path]):
|
|
self.__output_folder = output_folder if isinstance(output_folder, Path) else Path(output_folder)
|
|
self.__output_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
def get(self, name: str) -> torch.Tensor:
|
|
latent_path = self.get_path(name)
|
|
return torch.load(latent_path)
|
|
|
|
def save(self, name: str, data: torch.Tensor) -> None:
|
|
self.__output_folder.mkdir(parents=True, exist_ok=True)
|
|
latent_path = self.get_path(name)
|
|
torch.save(data, latent_path)
|
|
|
|
def delete(self, name: str) -> None:
|
|
latent_path = self.get_path(name)
|
|
latent_path.unlink()
|
|
|
|
def get_path(self, name: str) -> Path:
|
|
return self.__output_folder / name
|