InvokeAI/invokeai/backend/model_hash/model_hash.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

230 lines
8.1 KiB
Python
Raw Permalink Normal View History

# Copyright (c) 2023 Lincoln D. Stein and the InvokeAI Development Team
import hashlib
import os
from pathlib import Path
from typing import Callable, Literal, Optional, Union
2024-02-27 09:51:49 +00:00
from blake3 import blake3
from tqdm import tqdm
from invokeai.app.util.misc import uuid_string
HASHING_ALGORITHMS = Literal[
"blake3_multi",
"blake3_single",
"random",
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"blake2b",
"blake2s",
"sha3_224",
"sha3_256",
"sha3_384",
"sha3_512",
"shake_128",
"shake_256",
]
MODEL_FILE_EXTENSIONS = (".ckpt", ".safetensors", ".bin", ".pt", ".pth")
class ModelHash:
"""
Creates a hash of a model using a specified algorithm. The hash is prefixed by the algorithm used.
Args:
algorithm: Hashing algorithm to use. Defaults to BLAKE3.
file_filter: A function that takes a file name and returns True if the file should be included in the hash.
If the model is a single file, it is hashed directly using the provided algorithm.
If the model is a directory, each model weights file in the directory is hashed using the provided algorithm.
Only files with the following extensions are hashed: .ckpt, .safetensors, .bin, .pt, .pth
The final hash is computed by hashing the hashes of all model files in the directory using BLAKE3, ensuring
that directory hashes are never weaker than the file hashes.
A convenience algorithm choice of "random" is also available, which returns a random string. This is not a hash.
Usage:
```py
# BLAKE3 hash
ModelHash().hash("path/to/some/model.safetensors") # "blake3:ce3f0c5f3c05d119f4a5dcaf209b50d3149046a0d3a9adee9fed4c83cad6b4d0"
# MD5
ModelHash("md5").hash("path/to/model/dir/") # "md5:a0cd925fc063f98dbf029eee315060c3"
```
"""
def __init__(
self, algorithm: HASHING_ALGORITHMS = "blake3_single", file_filter: Optional[Callable[[str], bool]] = None
) -> None:
self.algorithm: HASHING_ALGORITHMS = algorithm
if algorithm == "blake3_multi":
self._hash_file = self._blake3
elif algorithm == "blake3_single":
self._hash_file = self._blake3_single
elif algorithm in hashlib.algorithms_available:
self._hash_file = self._get_hashlib(algorithm)
elif algorithm == "random":
self._hash_file = self._random
else:
raise ValueError(f"Algorithm {algorithm} not available")
self._file_filter = file_filter or self._default_file_filter
def hash(self, model_path: Union[str, Path]) -> str:
"""
Return hexdigest of hash of model located at model_path using the algorithm provided at class instantiation.
If model_path is a directory, the hash is computed by hashing the hashes of all model files in the
directory. The final composite hash is always computed using BLAKE3.
Args:
model_path: Path to the model
Returns:
str: Hexdigest of the hash of the model
"""
model_path = Path(model_path)
# blake3_single is a single-threaded version of blake3, prefix should still be "blake3:"
prefix = self._get_prefix(self.algorithm)
if model_path.is_file():
hash_ = None
# To give a similar user experience for single files and directories, we use a progress bar even for single files
pbar = tqdm([model_path], desc=f"Hashing {model_path.name}", unit="file")
for component in pbar:
pbar.set_description(f"Hashing {component.name}")
hash_ = prefix + self._hash_file(model_path)
assert hash_ is not None
return hash_
elif model_path.is_dir():
return prefix + self._hash_dir(model_path)
else:
raise OSError(f"Not a valid file or directory: {model_path}")
def _hash_dir(self, dir: Path) -> str:
"""Compute the hash for all files in a directory and return a hexdigest.
Args:
dir: Path to the directory
Returns:
str: Hexdigest of the hash of the directory
"""
model_component_paths = self._get_file_paths(dir, self._file_filter)
2024-02-27 09:51:49 +00:00
component_hashes: list[str] = []
pbar = tqdm(sorted(model_component_paths), desc=f"Hashing {dir.name}", unit="file")
for component in pbar:
pbar.set_description(f"Hashing {component.name}")
component_hashes.append(self._hash_file(component))
# BLAKE3 is cryptographically secure. We may as well fall back on a secure algorithm
# for the composite hash
composite_hasher = blake3()
for h in component_hashes:
composite_hasher.update(h.encode("utf-8"))
return composite_hasher.hexdigest()
@staticmethod
def _get_file_paths(model_path: Path, file_filter: Callable[[str], bool]) -> list[Path]:
"""Return a list of all model files in the directory.
Args:
model_path: Path to the model
file_filter: Function that takes a file name and returns True if the file should be included in the list.
Returns:
List of all model files in the directory
"""
files: list[Path] = []
for root, _dirs, _files in os.walk(model_path):
for file in _files:
if file_filter(file):
files.append(Path(root, file))
return files
@staticmethod
def _blake3(file_path: Path) -> str:
"""Hashes a file using BLAKE3, using parallelized and memory-mapped I/O to avoid reading the entire file into memory.
Args:
file_path: Path to the file to hash
Returns:
Hexdigest of the hash of the file
"""
file_hasher = blake3(max_threads=blake3.AUTO)
file_hasher.update_mmap(file_path)
return file_hasher.hexdigest()
2024-02-27 09:51:49 +00:00
@staticmethod
def _blake3_single(file_path: Path) -> str:
"""Hashes a file using BLAKE3, without parallelism. Suitable for spinning hard drives.
Args:
file_path: Path to the file to hash
Returns:
Hexdigest of the hash of the file
"""
file_hasher = blake3()
file_hasher.update_mmap(file_path)
return file_hasher.hexdigest()
@staticmethod
def _get_hashlib(algorithm: HASHING_ALGORITHMS) -> Callable[[Path], str]:
"""Factory function that returns a function to hash a file with the given algorithm.
Args:
algorithm: Hashing algorithm to use
Returns:
A function that hashes a file using the given algorithm
"""
def hashlib_hasher(file_path: Path) -> str:
"""Hashes a file using a hashlib algorithm. Uses `memoryview` to avoid reading the entire file into memory."""
hasher = hashlib.new(algorithm)
buffer = bytearray(128 * 1024)
mv = memoryview(buffer)
with open(file_path, "rb", buffering=0) as f:
while n := f.readinto(mv):
hasher.update(mv[:n])
return hasher.hexdigest()
return hashlib_hasher
@staticmethod
def _random(_file_path: Path) -> str:
"""Returns a random string. This is not a hash.
The string is a UUID, hashed with BLAKE3 to ensure that it is unique."""
return blake3(uuid_string().encode()).hexdigest()
@staticmethod
def _default_file_filter(file_path: str) -> bool:
"""A default file filter that only includes files with the following extensions: .ckpt, .safetensors, .bin, .pt, .pth
Args:
file_path: Path to the file
Returns:
True if the file matches the given extensions, otherwise False
"""
return file_path.endswith(MODEL_FILE_EXTENSIONS)
@staticmethod
def _get_prefix(algorithm: HASHING_ALGORITHMS) -> str:
"""Return the prefix for the given algorithm, e.g. \"blake3:\" or \"md5:\"."""
# blake3_single is a single-threaded version of blake3, prefix should still be "blake3:"
return "blake3:" if algorithm == "blake3_single" or algorithm == "blake3_multi" else f"{algorithm}:"