Compare commits

...

8 Commits

8 changed files with 1225 additions and 2 deletions

View File

@ -453,9 +453,9 @@ class PipelineFolderProbe(FolderProbeBase):
else:
with open(self.folder_path / "scheduler" / "scheduler_config.json", "r") as file:
scheduler_conf = json.load(file)
if scheduler_conf["prediction_type"] == "v_prediction":
if scheduler_conf.get("prediction_type", "epsilon") == "v_prediction":
return SchedulerPredictionType.VPrediction
elif scheduler_conf["prediction_type"] == "epsilon":
elif scheduler_conf.get("prediction_type", "epsilon") == "epsilon":
return SchedulerPredictionType.Epsilon
else:
return None

View File

@ -67,6 +67,7 @@ class SubModelType(str, Enum):
VaeEncoder = "vae_encoder"
Scheduler = "scheduler"
SafetyChecker = "safety_checker"
FeatureExtractor = "feature_extractor"
# MoVQ = "movq"

View File

@ -0,0 +1,446 @@
# Normalized Model Manager
This is proof-of-principle code that refactors model storage to be
more space efficient and less dependent on the particulars of Stable
Diffusion models. The driving observation is that there is a
significant amount of redundancy in Stable Diffusion models. For
example, the VAE, tokenizer and safety checker are frequently the same
across multiple models derived from the same base models.
The way the normalized model manager works is that when a main
(pipeline) model is ingested, each of its submodels ("vae", "unet" and
so forth) is scanned and hashed using a fast sampling and hashing
algorithm. If the submodel has a hash that hasn't been seen before, it
is copied into a folder within INVOKEAI_ROOT, and we create a new
database entry with the submodel's path and a reference count of "1".
If the submodel has a hash that has previously been seen, then we
update the database to bump up the submodel's reference count.
Checkpoint files (.bin, .ckpt and .safetensors) are converted into
diffusers format prior to ingestion. The system directly imports
simple models, such as LoRAs and standalone VAEs, and normalizes them
if previously seen. This has benefits when a user tries to ingest the
same VAE twice under different names.
Additional database tables map the relationship between main models
and their submodels, and to record which base model(s) a submodel is
compatible with.
## Installation and Testing
To test, checkout the PR and run `pip install -e .`. This will create
a command called `invokeai-nmm` (for "normalized model
manager"). To ingest a single model:
```
invokeai-nmm ingest my_model.safetensors
```
To ingest a whole directory of models:
```
invokeai-nmm ingest my_models/*
```
These commands will create a sqlite3 database of model data in
`INVOKEAI_ROOT/databases/normalized_models.db`, copy the model data
into a blobs directory under `INVOKEAI_ROOT/model_blobs`, and create
appropriate entries in the database. You can then use the API to
retrieve information on pipelines and submodels.
The `invokeai-nmm` tool has a number of other features, including
listing models and examining pipeline subparts. In addition, it has an
`export` command which will reconstitute a diffusers pipeline by
creating a directory containing symbolic links into the blogs
directory.
Use `invokeai-nmm --help` to get a summary of commands and their
flags.
## Benchmarking
To test the performance of the normalied model system, I ingested a
InvokeAI models directory of 117 different models (35 main models, 52
LoRAs, 9 controlnets, 8 embeddings and miscellaneous others). The
ingestion, which included the conversion of multiple checkpoint to
diffusers models, took about 2 minutes. Prior to ingestion, the
directory took up 189.5 GB. After ingestion, it was reduced to 160 GB,
an overall 16% reduction in size and a savings of 29 GB.
I was a surprised at the relatively modest space savings and checked that
submodels were indeed being shared. They were:
```
sqlite> select part_id,type,refcount from simple_model order by refcount desc,type;
┌─────────┬───────────────────┬──────────┐
│ part_id │ type │ refcount │
├─────────┼───────────────────┼──────────┤
│ 28 │ tokenizer │ 9 │
│ 67 │ feature_extractor │ 7 │
│ 33 │ feature_extractor │ 5 │
│ 38 │ tokenizer │ 5 │
│ 26 │ safety_checker │ 4 │
│ 32 │ safety_checker │ 4 │
│ 37 │ scheduler │ 4 │
│ 29 │ vae │ 3 │
│ 30 │ feature_extractor │ 2 │
│ 72 │ safety_checker │ 2 │
│ 54 │ scheduler │ 2 │
│ 100 │ scheduler │ 2 │
│ 71 │ text_encoder │ 2 │
│ 90 │ text_encoder │ 2 │
│ 99 │ text_encoder_2 │ 2 │
│ 98 │ tokenizer_2 │ 2 │
│ 44 │ vae │ 2 │
│ 73 │ vae │ 2 │
│ 91 │ vae │ 2 │
│ 97 │ vae │ 2 │
│ 1 │ clip_vision │ 1 │
│ 2 │ clip_vision │ 1 │
...
```
As expected, submodels that don't change from model to model, such as
the tokenizer and safety checker, are frequently shared across main
models. So were the VAEs, but less frequently than I expected. On
further inspection, the spread of VAEs was explained by the following
formatting differences:
1. Whether the VAE weights are .bin or .safetensors
2. Whether it is an fp16 or fp32 VAE
3. Actual differences in the VAE's training
Ironically, checkpoint models downloaded from Civitai are more likely
to share submodels than diffusers pipelines directly downloaded from
HuggingFace. This is because the checkpoints pass through a uniform
conversion process, while diffusers downloaded directly from
HuggingFace are more likely to have format-related differences.
## Database tables
This illustrates the database schema.
### SIMPLE_MODEL
This provides the type and path of each fundamental model. The type
can be any of the ModelType enums, including clip_vision, etc.
```
┌─────────┬───────────────────┬──────────────────────────────────┬──────────┬──────────────────────────────────────────────────────────────────────────────────────────┐
│ part_id │ type │ hash │ refcount │ path │
├─────────┼───────────────────┼──────────────────────────────────┼──────────┼──────────────────────────────────────────────────────────────────────────────────────────┤
│ 26 │ safety_checker │ 76b420d8f641411021ec1dadca767cf7 │ 4 │ /opt/model_blobs/safety_checker-7214b322-1069-4753-a4d5-fe9e18915ca7 │
│ 28 │ tokenizer │ 44e42c7bf25b5e32e8d7de0b822cf012 │ 9 │ /opt/model_blobs/tokenizer-caeb7f7f-e3db-4d67-8f60-1a4831e1aef2 │
│ 29 │ vae │ c9aa45f52c5d4e15a22677f34436d373 │ 3 │ /opt/model_blobs/vae-7e7d96ee-074f-45dc-8c43-c9902b0d0671 │
│ 30 │ feature_extractor │ 3240f79383fdf6ea7f24bbd5569cb106 │ 2 │ /opt/model_blobs/feature_extractor-a5bb8ceb-2c15-4b7f-bd43-964396440f6c │
│ 32 │ safety_checker │ 2e2f7732cff3349350bc99f3e7ab3998 │ 4 │ /opt/model_blobs/safety_checker-ef70c446-e3a1-445c-b216-d7c4acfdbcda │
└─────────┴───────────────────┴──────────────────────────────────┴──────────┴──────────────────────────────────────────────────────────────────────────────────────────┘
```
The Refcount indicates how many pipelines the fundamental is being
shared with. The path is where the submodel is stored, and uses a
randomly-assigned file/directory name to avoid collisions.
The `type` field is a SQLITE3 ENUM that maps to the values of the
`ModelType` enum.
### MODEL_NAME
The MODEL_NAME table stores the name and other metadata of a top-level
model. The same table is used for both simple models (one part only)
and pipeline models (multiple parts).
Note that in the current implementation, the model name is forced to
be unique and is currently used as the identifier for retrieving
models from the database. This is a simplifying implementation detail;
in a real system the name would be supplemented with some sort of
anonymous key.
Only top-level models are entered into the MODEL_NAME table. The
models contained in subfolders of a pipeline become unnamed anonymous
parts stored in SIMPLE_MODEL and associated with the named model(s)
that use them in the MODEL_PARTS table described next.
An interesting piece of behavior is that the same simple model can be
both anonymous and named. Consider a VAE that is first imported from
the 'vae' folder of a main model. Because it is part of a larger
pipeline, there will be an entry for the VAE in SIMPLE_MODEL with a
refcount of 1, but not in the MODEL_NAME table. However let's say
that, at a later date, the user ingests the same model as a named
standalone VAE. The system will detect that this is the same model,
and will create a named entry to the VAE in MODEL_NAME that identifies
the VAE as its sole part. In SIMPLE_MODEL, the VAE's refcount will be
bumped up to 2. Thus, the same simple model can be retrieved in two
ways: by requesting the "vae" submodel of the named pipeline, or by
requesting it via its standalone name.
The MODEL_NAME table has fields for the model's name, its source, and
description. The `is_pipeline` field is True if the named model is a
pipeline that contains subparts. In the case of a pipeline, then the
`table_of_contents` field will hold a copy of the contents of
`model_index.json`. This is used for the sole purpose of regenerating
a de-normalized diffusers folder from the database.
```
├──────────┼────────────────────────────────┼────────────────────────────────────────────────────────────┼───────────────────────────────────────────────┼─────────────┼───────────────────┤
│ model_id │ name │ source │ description │ is_pipeline │ table_of_contents │
├──────────┼────────────────────────────────┼────────────────────────────────────────────────────────────┼───────────────────────────────────────────────┼─────────────┼───────────────────┤
│ 1 │ ip_adapter_sd_image_encoder │ /opt/models/any/clip_vision/ip_adapter_sd_image_encoder │ Imported model ip_adapter_sd_image_encoder │ 0 │ │
│ 2 │ ip_adapter_sd_image_encoder_01 │ /opt/models/any/clip_vision/ip_adapter_sd_image_encoder_01 │ Imported model ip_adapter_sd_image_encoder_01 │ 0 │ │
│ 3 │ ip_adapter_sdxl_image_encoder │ /opt/models/any/clip_vision/ip_adapter_sdxl_image_encoder │ Imported model ip_adapter_sdxl_image_encoder │ 0 │ │
│ 4 │ control_v11e_sd15_ip2p │ /opt/models/sd-1/controlnet/control_v11e_sd15_ip2p │ Imported model control_v11e_sd15_ip2p │ 0 │ │
│ 5 │ control_v11e_sd15_shuffle │ /opt/models/sd-1/controlnet/control_v11e_sd15_shuffle │ Imported model control_v11e_sd15_shuffle │ 0 │ │
│ 6 │ control_v11f1e_sd15_tile │ /opt/models/sd-1/controlnet/control_v11f1e_sd15_tile │ Imported model control_v11f1e_sd15_tile │ 0 │ │
│ 7 │ control_v11f1p_sd15_depth │ /opt/models/sd-1/controlnet/control_v11f1p_sd15_depth │ Imported model control_v11f1p_sd15_depth │ 0 │ │
│ 8 │ control_v11p_sd15_canny │ /opt/models/sd-1/controlnet/control_v11p_sd15_canny │ Imported model control_v11p_sd15_canny │ 0 │ │
│ 9 │ control_v11p_sd15_inpaint │ /opt/models/sd-1/controlnet/control_v11p_sd15_inpaint │ Imported model control_v11p_sd15_inpaint │ 0 │ │
│ 10 │ control_v11p_sd15_lineart │ /opt/models/sd-1/controlnet/control_v11p_sd15_lineart │ Imported model control_v11p_sd15_lineart │ 0 │ │
└──────────┴────────────────────────────────┴─────────-──────────────────────────────────────────────────┴───────────────────────────────────────────────┴─────────────┴───────────────────┘
```
### MODEL_PARTS
The MODEL_PARTS table maps the `model_id` field from MODEL_NAME to the
`part_id` field of SIMPLE_MODEL, as shown below. The `part_name` field
contains the subfolder name that the part was located in at model
ingestion time.
There is not exactly a one-to-one correspondence between the
MODEL_PARTS `part_name` and the SIMPLE_MODEL `type` fields. For
example, SDXL models have part_names of `text_encoder` and
`text_encoder_2`, both of which point to a simple model of type
`text_encoder`.
For one-part model such as LoRAs, the `part_name` is `root`.
```
┌──────────┬─────────┬───────────────────┐
│ model_id │ part_id │ part_name │
├──────────┼─────────┼───────────────────┤
│ 6 │ 6 │ root │
│ 25 │ 25 │ unet │
│ 25 │ 26 │ safety_checker │
│ 25 │ 27 │ text_encoder │
│ 25 │ 28 │ tokenizer │
│ 25 │ 29 │ vae │
│ 25 │ 30 │ feature_extractor │
│ 25 │ 31 │ scheduler │
│ 26 │ 32 │ safety_checker │
│ 26 │ 33 │ feature_extractor │
│ 26 │ 34 │ unet │
└──────────┴─────────┴───────────────────┘
```
### MODEL_BASE
The MODEL_BASE table maps simple models to the base models that they
are compatible with. A simple model may be compatible with one base
only (e.g. an SDXL-based `unet`); it may be compatible with multiple
bases (e.g. a VAE that works with either `sd-1` or `sd-2`); or it may
be compatible with all models (e.g. a `clip_vision` model).
This table has two fields, the `part_id` and the `base` it is
compatible with. The base is a SQLITE ENUM that corresponds to the
`BaseModelType` enum.
```
sqlite> select * from model_base limit 8;
┌─────────┬──────────────┐
│ part_id │ base │
├─────────┼──────────────┤
│ 1 │ sd-1 │
│ 1 │ sd-2 │
│ 1 │ sdxl │
│ 1 │ sdxl-refiner │
│ 2 │ sd-1 │
│ 2 │ sd-2 │
│ 2 │ sdxl │
│ 2 │ sdxl-refiner │
└─────────┴──────────────┘
```
At ingestion time, the MODEL_BASE table is populated using the
following algorithm:
1. If the ingested model is a multi-part pipeline, then each of its
parts is assigned the base determined by probing the pipeline as a
whole.
2. If the ingested model is a single-part simple model, then its part
is assigned to the base returned by probing the simple model.
3. Any models that return `BaseModelType.Any` at probe time will be
assigned to all four of the base model types as shown in the
example above.
Interestingly, the table will "learn" when the same simple model is
compatible with multiple bases. Consider a sequence of events in which
the user ingests an sd-1 model containing a VAE. The VAE will
initially get a single row in the MODEL_BASE table with base
"sd-1". Next the user ingests an sd-2 model that contains the same
VAE. The system will recognize that the same VAE is being used for a
model with a different base, and will add a new row to the table
indicating that this VAE is compatible with either sd-1 or sd-2.
When retrieving information about a multipart pipeline using the API,
the system will intersect the base compatibility of all the components
of the pipeline until it finds the set of base(s) that all the
subparts are compatible with.
## The API
Initialization will look something like this:
```
from pathlib import Path
from invokeai.app.services.config import InvokeAIAppConfig
from invokeai.backend.normalized_mm.normalized_model_manager import NormalizedModelManager
config = InvokeAIAppConfig.get_config()
config.parse_args()
nmm = NormalizedModelManager(config)
```
At the current time, the InvokeAIAppConfig object is used only to
locate the root directory path and the location of the `databases`
subdirectory.
## Ingesting a model
Apply the `ingest()` method to a checkpoint or diffusers folder Path
and an optional model name. If the model name isn't provided, then it
will be derived from the stem of the ingested filename/folder.
```
model_config = nmm.ingest(
Path('/tmp/models/slick_anime.safetensors'),
name="Slick Anime",
)
```
Depending on what is being ingested, the call will return either a
`SimpleModelConfig` or a `PipelineConfig` object, which are slightly
different from each other:
```
@dataclass
class SimpleModelConfig:
"""Submodel name, description, type and path."""
name: str
description: str
base_models: Set[BaseModelType]
type: ExtendedModelType
path: Path
@dataclass
class PipelineConfig:
"""Pipeline model name, description, type and parts."""
name: str
description: str
base_models: Set[BaseModelType]
parts: Dict[str, ModelPart] # part_name -> ModelPart
@dataclass
class ModelPart:
"""Type and path of a pipeline submodel."""
type: ExtendedModelType
path: Path
refcount: int
```
For more control, you can directly call the `ingest_pipeline_model()`
or `ingest_simple_model()` methods, which operate on multi-part
pipelines and single-part models respectively.
Note that the `ExtendedModelType` class is an enum created from the
union of the current model manager's `ModelType` and
`SubModelType`. This was necessary to support the SIMPLE_MODEL table's
`type` field.
## Fetching a model
To fetch a simple model, call `get_model()` with the name of the model
and optionally its part_name. This returns a `SimpleModelConfig` object.
```
model_info = nmm.get_model(name='stable-diffusion-v1-5', part='unet')
print(model_info.path)
print(model_info.description)
print(model_info.base_models)
```
If the model only has one part, leave out the `part` argument, or use
`part=root`:
```
model_info = nmm.get_model(name='detail_slider_v1')
```
To fetch information about a pipeline model, call `get_pipeline()`:
```
model_info = nmm.get_pipeline('stable-diffusion-v1-5')
for part_name, part in model_info.parts.items():
print(f'{part_name} is located at {part.path}')
```
This returns a `PipelineConfig` object, which you can then interrogate
to get the model's name, description, list of base models it is
compatible with, and its parts. The latter is a dict mapping the
part_name (the original subfolder name) to a `ModelPart` object that
contains the part's type, refcount and path.
## Exporting a model
To export a model back into its native format (diffusers for main,
safetensors for other types), use `export_pipeline`:
```
nmm.export_pipeline(name='stable-diffusion-v1-5', destination='/path/to/export/folder')
```
The model will be exported to the indicated folder as a folder at
`/path/to/export/folder/stable-diffusion-v1-5`. It will contain a copy
of the original `model_index.json` file, and a series of symbolic
links pointing into the model blobs directory for each of the
subfolders.
Despite its name, `export_pipeline()` works as expected with simple
models as well.
## Listing models in the database
There is currently a `list_models()` method that retrieves a list of
all the **named** models in the database. It doesn't currently provide any
way of filtering by name, type or base compatibility, but these are
easy to add in the future.
`list_models()` returns a list of `ModelListing` objects:
```
class ModelListing:
"""Slightly simplified object for generating listings."""
name: str
description: str
source: str
type: ModelType
base_models: Set[BaseModelType]
```
An alternative implementation might return a list of
Union[SimpleModelConfig, PipelineConfig], but it seemed cleanest to
return a uniform list.
## Deleting models
Model deletion is not currently fully implemented. When implemented,
deletion of a named model will decrement the refcount of each of its
subparts and then delete parts whose refcount has reached zero. The
appropriate triggers for incrementing and decrementing the refcount
have already been implemented in the database schema.

View File

@ -0,0 +1,108 @@
#!/usr/bin/env python
import argparse
import sys
from pathlib import Path
from typing import Optional
from invokeai.app.services.config import InvokeAIAppConfig
from invokeai.backend.normalized_mm.normalized_model_manager import (
DuplicateModelException,
InvalidModelException,
ModelNotFoundException,
NormalizedModelManager,
)
config: InvokeAIAppConfig = InvokeAIAppConfig.get_config()
model_manager: Optional[NormalizedModelManager] = None
def list_parts(args):
try:
model = model_manager.get_pipeline(args.model_name)
print(f"Components of model {args.model_name}:")
print(f" {'ROLE':20s} {'TYPE':20s} {'REFCOUNT':8} PATH")
for role, part in model.parts.items():
print(f" {role:20s} {part.type:20s} {part.refcount:4d} {part.path}")
except ModelNotFoundException:
print(f"{args.model_name}: model not found")
def list_models(args):
model_list = model_manager.list_models()
print(f"{'NAME':30s} {'TYPE':10s} {'BASE(S)':10s} {'DESCRIPTION':40s} ORIGINAL SOURCE")
for model in model_list:
print(
f"{model.name:30s} {model.type.value:10s} {', '.join([x.value for x in model.base_models]):10s} {model.description:40s} {model.source}"
)
def ingest_models(args):
for path in args.model_paths:
try:
print(f"ingesting {path}...", end="")
model_manager.ingest(path)
print("success.")
except (OSError, InvalidModelException, DuplicateModelException) as e:
print(f"FAILED: {e}")
def export_model(args):
print(f"exporting {args.model_name} to {args.destination}...", end="")
try:
model_manager.export_pipeline(args.model_name, args.destination)
print("success.")
except (OSError, ModelNotFoundException, InvalidModelException) as e:
print(f"FAILED: {e}")
def main():
global model_manager
global config
parser = argparse.ArgumentParser(description="Normalized model manager util")
parser.add_argument("--root_dir", dest="root", type=str, default=None, help="path to INVOKEAI_ROOT")
subparsers = parser.add_subparsers(help="commands")
parser_ingest = subparsers.add_parser("ingest", help="ingest checkpoint or diffusers models")
parser_ingest.add_argument("model_paths", type=Path, nargs="+", help="paths to one or more models to be ingested")
parser_ingest.set_defaults(func=ingest_models)
parser_export = subparsers.add_parser("export", help="export a pipeline to indicated directory")
parser_export.add_argument(
"model_name",
type=str,
help="name of model to export",
)
parser_export.add_argument(
"destination",
type=Path,
help="path to destination to export pipeline to",
)
parser_export.set_defaults(func=export_model)
parser_list = subparsers.add_parser("list", help="list models")
parser_list.set_defaults(func=list_models)
parser_listparts = subparsers.add_parser("list-parts", help="list the parts of a pipeline model")
parser_listparts.add_argument(
"model_name",
type=str,
help="name of pipeline model to list parts of",
)
parser_listparts.set_defaults(func=list_parts)
if len(sys.argv) <= 1:
sys.argv.append("--help")
args = parser.parse_args()
if args.root:
config.parse_args(["--root", args.root])
else:
config.parse_args([])
model_manager = NormalizedModelManager(config)
args.func(args)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
# Copyright (c) 2023 Lincoln D. Stein and the InvokeAI Development Team
"""
Fast hashing of diffusers and checkpoint-style models.
Usage:
from invokeai.backend.model_managre.model_hash import FastModelHash
>>> FastModelHash.hash('/home/models/stable-diffusion-v1.5')
'a8e693a126ea5b831c96064dc569956f'
"""
import hashlib
import os
from pathlib import Path
from typing import Dict, Union
from imohash import hashfile
from invokeai.backend.model_management.models import InvalidModelException
class FastModelHash(object):
"""FastModelHash obect provides one public class method, hash()."""
@classmethod
def hash(cls, model_location: Union[str, Path]) -> str:
"""
Return hexdigest string for model located at model_location.
:param model_location: Path to the model
"""
model_location = Path(model_location)
if model_location.is_file():
return cls._hash_file(model_location)
elif model_location.is_dir():
return cls._hash_dir(model_location)
else:
raise InvalidModelException(f"Not a valid file or directory: {model_location}")
@classmethod
def _hash_file(cls, model_location: Union[str, Path]) -> str:
"""
Fasthash a single file and return its hexdigest.
:param model_location: Path to the model file
"""
# we return md5 hash of the filehash to make it shorter
# cryptographic security not needed here
return hashlib.md5(hashfile(model_location)).hexdigest()
@classmethod
def _hash_dir(cls, model_location: Union[str, Path]) -> str:
components: Dict[str, str] = {}
for root, dirs, files in os.walk(model_location):
for file in files:
path = Path(root) / file
if path.name == "config.json": # don't use - varies according to diffusers version
continue
fast_hash = cls._hash_file(path.as_posix())
components.update({path: fast_hash})
# hash all the model hashes together, using alphabetic file order
md5 = hashlib.md5()
for path, fast_hash in sorted(components.items()):
md5.update(fast_hash.encode("utf-8"))
return md5.hexdigest()

View File

@ -0,0 +1,601 @@
import sqlite3
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from shutil import copy, copytree
from tempfile import TemporaryDirectory
from typing import Dict, List, Optional, Set, Tuple, Union
from uuid import uuid4
from diffusers import StableDiffusionInpaintPipeline, StableDiffusionPipeline
from invokeai.app.services.config import InvokeAIAppConfig
from ..model_management import BaseModelType, DuplicateModelException, ModelNotFoundException, ModelType, SubModelType
from ..model_management.convert_ckpt_to_diffusers import convert_ckpt_to_diffusers
from ..model_management.model_probe import InvalidModelException, ModelProbe, ModelVariantType
from ..util.devices import choose_torch_device, torch_dtype
from .hash import FastModelHash
# We create a new enumeration for model types
model_types = {x.name: x.value for x in ModelType}
model_types.update({x.name: x.value for x in SubModelType})
ExtendedModelType = Enum("ExtendedModelType", model_types, type=str)
# Turn into a SQL enum
MODEL_TYPES = {x.value for x in ExtendedModelType}
MODEL_SQL_ENUM = ",".join([f'"{x}"' for x in MODEL_TYPES])
# Again
BASE_TYPES = {x.value for x in BaseModelType}
BASE_SQL_ENUM = ",".join([f'"{x}"' for x in BASE_TYPES])
@dataclass
class ModelPart:
"""Type and path of a pipeline submodel."""
type: ExtendedModelType
path: Path
refcount: int
@dataclass
class SimpleModelConfig:
"""Submodel name, description, type and path."""
name: str
description: str
base_models: Set[BaseModelType]
type: ExtendedModelType
path: Path
@dataclass
class PipelineConfig:
"""Pipeline model name, description, type and parts."""
name: str
description: str
base_models: Set[BaseModelType]
parts: Dict[str, ModelPart]
@dataclass
class ModelListing:
"""Slightly simplified object for generating listings."""
name: str
description: str
source: str
type: ModelType
base_models: Set[BaseModelType]
class NormalizedModelManager:
_conn: sqlite3.Connection
_cursor: sqlite3.Cursor
_blob_directory: Path
def __init__(self, config=InvokeAIAppConfig):
database_file = config.db_path.parent / "normalized_models.db"
Path(database_file).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(database_file, check_same_thread=True)
self._conn.row_factory = sqlite3.Row
self._conn.isolation_level = "DEFERRED"
self._cursor = self._conn.cursor()
self._blob_directory = config.root_path / "model_blobs"
self._blob_directory.mkdir(parents=True, exist_ok=True)
self._conn.execute("PRAGMA foreign_keys = ON;")
self._create_tables()
self._conn.commit()
def ingest(self, model_path: Path, name: Optional[str] = None) -> Union[SimpleModelConfig, PipelineConfig]:
"""Ingest a simple or pipeline model into the normalized models database."""
model_path = model_path.absolute()
info = ModelProbe.probe(model_path)
if info.model_type == ModelType.Main:
return self.ingest_pipeline_model(model_path, name)
else:
return self.ingest_simple_model(model_path, name)
def ingest_simple_model(self, model_path: Path, name: Optional[str] = None) -> SimpleModelConfig:
"""Insert a simple one-part model, returning its config."""
model_name = name or model_path.stem
model_hash = FastModelHash.hash(model_path)
try:
# retrieve or create the single part that goes into this model
part_id = self._lookup_part_by_hash(model_hash) or self._install_part(model_hash, model_path)
# create the model name/source entry
self._cursor.execute(
"""--sql
INSERT INTO model_name (
name, source, description, is_pipeline
)
VALUES (?, ?, ?, 0);
""",
(model_name, model_path.as_posix(), f"Imported model {model_name}"),
)
# associate the part with the model
model_id = self._cursor.lastrowid
self._cursor.execute(
"""--sql
INSERT INTO model_parts (
model_id, part_id
)
VALUES (?, ?);
""",
(
model_id,
part_id,
),
)
self._conn.commit()
except sqlite3.Error as e:
self._conn.rollback()
if isinstance(e, sqlite3.IntegrityError):
raise DuplicateModelException(f"a model named {model_name} is already in the database") from e
else:
raise e
return self.get_model(model_name)
def ingest_pipeline_model(self, model_path: Path, name: Optional[str] = None) -> PipelineConfig:
"""Insert the components of a diffusers pipeline."""
if model_path.is_file(): # convert to diffusers before ingesting
name = name or model_path.stem
with TemporaryDirectory() as tmp_dir:
_convert_ckpt(model_path, Path(tmp_dir))
result = self._ingest_pipeline_model(Path(tmp_dir), name, source=model_path)
return result
else:
return self._ingest_pipeline_model(model_path, name)
def _ingest_pipeline_model(
self, model_path: Path, name: Optional[str] = None, source: Optional[Path] = None
) -> PipelineConfig:
"""Insert the components of a diffusers pipeline."""
model_name = name or model_path.stem
model_index = model_path / "model_index.json"
assert (
model_index.exists()
), f"{model_path} does not look like a diffusers model: model_index.json is missing" # check that it is a diffuers
with open(model_index, "r") as file:
toc = file.read()
base_type = ModelProbe.probe(model_path).base_type
source = source or model_path
try:
# create a name entry for the pipeline and insert its table of contents
self._cursor.execute(
"""--sql
INSERT INTO model_name (
name, source, description, is_pipeline, table_of_contents
)
VALUES(?, ?, ?, "1", ?);
""",
(model_name, source.as_posix(), f"Normalized pipeline {model_name}", toc),
)
pipeline_id = self._cursor.lastrowid
# now we create or retrieve each of the parts
subdirectories = [x for x in model_path.iterdir() if x.is_dir()]
parts_to_insert = []
bases_to_insert = []
for submodel in subdirectories:
part_name = submodel.stem
part_path = submodel
part_hash = FastModelHash.hash(part_path)
part_id = self._lookup_part_by_hash(part_hash) or self._install_part(part_hash, part_path, {base_type})
parts_to_insert.append((pipeline_id, part_id, part_name))
bases_to_insert.append((part_id, base_type.value))
# insert the parts into the part list
self._cursor.executemany(
"""--sql
INSERT INTO model_parts (
model_id, part_id, part_name
)
VALUES(?, ?, ?);
""",
parts_to_insert,
)
# update the base types - over time each simple model will get tagged
# with all the base types of any pipelines that use it, which is a feature... I think?
self._cursor.executemany(
"""--sql
INSERT OR IGNORE INTO model_base (
part_id, base
)
VALUES(?, ?);
""",
bases_to_insert,
)
self._conn.commit()
except sqlite3.Error as e:
self._conn.rollback()
if isinstance(e, sqlite3.IntegrityError):
raise DuplicateModelException(f"a model named {model_name} is already in the database") from e
else:
raise e
return self.get_pipeline(model_name)
# in this p-o-p implementation, we assume that the model name is unique
def get_model(self, name: str, part: Optional[str] = "root") -> SimpleModelConfig:
"""Fetch a simple model. Use optional `part` to specify the diffusers subfolder."""
self._cursor.execute(
"""--sql
SELECT a.source, a.description, c.type, b.part_name, c.path, d.base
FROM model_name as a,
model_parts as b,
simple_model as c,
model_base as d
WHERE a.name=?
AND a.model_id=b.model_id
AND b.part_id=c.part_id
AND b.part_id=d.part_id
AND b.part_name=?
""",
(name, part),
)
rows = self._cursor.fetchall()
if len(rows) == 0:
raise ModelNotFoundException
bases: Set[BaseModelType] = {BaseModelType(x["base"]) for x in rows}
return SimpleModelConfig(
name=name,
description=rows[0]["description"],
base_models=bases,
type=ExtendedModelType(rows[0]["type"]),
path=Path(rows[0]["path"]),
)
# in this p-o-p implementation, we assume that the model name is unique
def get_pipeline(self, name: str) -> PipelineConfig:
"""Fetch a pipeline model."""
self._cursor.execute(
"""--sql
SELECT a.source, a.description, c.type, b.part_name, c.path, d.base, c.refcount
FROM model_name as a,
model_parts as b,
simple_model as c,
model_base as d
WHERE a.name=?
AND a.model_id=b.model_id
AND b.part_id=c.part_id
AND b.part_id=d.part_id
""",
(name,),
)
rows = self._cursor.fetchall()
if len(rows) == 0:
raise ModelNotFoundException
# Find the intersection of base models supported by each part.
# Need a more pythonic way of doing this!
bases: Dict[str, Set] = dict()
base_union: Set[BaseModelType] = set()
parts = dict()
for row in rows:
part_name = row["part_name"]
base = row["base"]
if not bases.get(part_name):
bases[part_name] = set()
bases[part_name].add(base)
base_union.add(base)
parts[part_name] = ModelPart(row["type"], row["path"], row["refcount"])
for base_set in bases.values():
base_union = base_union.intersection(base_set)
return PipelineConfig(
name=name,
description=rows[0]["description"],
base_models={BaseModelType(x) for x in base_union},
parts=parts,
)
def list_models(self) -> List[ModelListing]:
"""Get a listing of models. No filtering implemented yet."""
# get simple models first
self._cursor.execute(
"""--sql
SELECT name, source, is_pipeline
FROM model_name;
""",
(),
)
results: List[ModelListing] = []
for row in self._cursor.fetchall():
if row["is_pipeline"]:
pipeline = self.get_pipeline(row["name"])
results.append(
ModelListing(
name=pipeline.name,
description=pipeline.description,
source=row["source"],
type=ModelType.Main,
base_models=pipeline.base_models,
)
)
else:
model = self.get_model(row["name"])
results.append(
ModelListing(
name=model.name,
description=model.description,
source=row["source"],
type=model.type,
base_models=model.base_models,
)
)
return results
def export_pipeline(self, name: str, destination: Path) -> Path:
"""Reconstruction the pipeline as a set of symbolic links in folder indicated by destination."""
# get the model_index.json file (the "toc")
self._cursor.execute(
"""--sql
SELECT table_of_contents, is_pipeline
FROM model_name
WHERE name=?
""",
(name,),
)
row = self._cursor.fetchone()
if row is None:
raise ModelNotFoundException
# if the destination exists and is a directory, then we create
# a new subdirectory using the model name
if destination.exists() and destination.is_dir():
destination = destination / name
# now check that the (possibly new) destination doesn't already exist
if destination.exists():
raise OSError(f"{destination}: path or directory exists; won't overwrite")
if row["is_pipeline"]:
# write the toc
toc = row[0]
destination.mkdir(parents=True)
with open(destination / "model_index.json", "w") as model_index:
model_index.write(toc)
# symlink the subfolders
model = self.get_pipeline(name)
for part_name, part_config in model.parts.items():
source_path = destination / part_name
target_path = part_config.path
source_path.symlink_to(target_path)
else:
model = self.get_model(name)
destination = Path(destination.as_posix() + model.path.suffix)
destination.symlink_to(model.path)
return destination
def _lookup_part_by_hash(self, hash: str) -> Optional[int]:
self._cursor.execute(
"""--sql
SELECT part_id from simple_model
WHERE hash=?;
""",
(hash,),
)
rows = self._cursor.fetchone()
if not rows:
return None
return rows[0]
# may raise an exception
def _install_part(self, model_hash: str, model_path: Path, base_types: Set[BaseModelType] = set()) -> int:
(model_type, model_base) = self._probe_model(model_path)
if model_base is None:
model_bases = base_types
else:
# hack logic to test multiple base type compatibility
model_bases = set()
if model_type == ExtendedModelType("vae") and model_base == BaseModelType("sd-1"):
model_bases = {BaseModelType("sd-1"), BaseModelType("sd-2")}
elif model_base == BaseModelType("any"):
model_bases = {BaseModelType(x) for x in BASE_TYPES}
else:
model_bases = {BaseModelType(model_base)}
# make the storage name slightly easier to interpret
blob_name = model_type.value + "-" + str(uuid4())
if model_path.is_file() and model_path.suffix:
blob_name += model_path.suffix
destination = self._blob_directory / blob_name
assert not destination.exists(), f"a path named {destination} already exists"
if model_path.is_dir():
copytree(model_path, destination)
else:
copy(model_path, destination)
# create entry in the model_path table
self._cursor.execute(
"""--sql
INSERT INTO simple_model (
type, hash, path
)
VALUES (?, ?, ?);
""",
(model_type.value, model_hash, destination.as_posix()),
)
# id of the inserted row
part_id = self._cursor.lastrowid
# create base compatibility info
for base in model_bases:
self._cursor.execute(
"""--sql
INSERT INTO model_base (part_id, base)
VALUES (?, ?);
""",
(part_id, BaseModelType(base).value),
)
return part_id
def _create_tables(self):
self._cursor.execute(
f"""--sql
CREATE TABLE IF NOT EXISTS simple_model (
part_id INTEGER PRIMARY KEY,
type TEXT CHECK( type IN ({MODEL_SQL_ENUM}) ) NOT NULL,
hash TEXT UNIQUE,
refcount INTEGER NOT NULL DEFAULT '0',
path TEXT NOT NULL
);
"""
)
self._cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS model_name (
model_id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
source TEXT,
description TEXT,
is_pipeline BOOLEAN NOT NULL DEFAULT '0',
table_of_contents TEXT, -- this is the contents of model_index.json
UNIQUE(name)
);
"""
)
self._cursor.execute(
f"""--sql
CREATE TABLE IF NOT EXISTS model_base (
part_id TEXT NOT NULL,
base TEXT CHECK( base in ({BASE_SQL_ENUM}) ) NOT NULL,
FOREIGN KEY(part_id) REFERENCES simple_model(part_id),
UNIQUE(part_id,base)
);
"""
)
self._cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS model_parts (
model_id INTEGER NOT NULL,
part_id INTEGER NOT NULL,
part_name TEXT DEFAULT 'root', -- to do: use enum
FOREIGN KEY(model_id) REFERENCES model_name(model_id),
FOREIGN KEY(part_id) REFERENCES simple_model(part_id),
UNIQUE(model_id, part_id, part_name)
);
"""
)
self._cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS insert_model_refcount
AFTER INSERT
ON model_parts FOR EACH ROW
BEGIN
UPDATE simple_model SET refcount=refcount+1 WHERE simple_model.part_id=new.part_id;
END;
"""
)
self._cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS delete_model_refcount
AFTER DELETE
ON model_parts FOR EACH ROW
BEGIN
UPDATE simple_model SET refcount=refcount-1 WHERE simple_model.part_id=old.part_id;
END;
"""
)
self._cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS update_model_refcount
AFTER UPDATE
ON model_parts FOR EACH ROW
BEGIN
UPDATE simple_model SET refcount=refcount-1 WHERE simple_model.part_id=old.part_id;
UPDATE simple_model SET refcount=refcount+1 WHERE simple_model.part_id=new.part_id;
END;
"""
)
def _probe_model(self, model_path: Path) -> Tuple[ExtendedModelType, Optional[BaseModelType]]:
try:
model_info = ModelProbe.probe(model_path)
return (model_info.model_type, model_info.base_type)
except InvalidModelException:
return (ExtendedModelType(model_path.stem), None)
# Adapted from invokeai/backend/model_management/models/stable_diffusion.py
# This code should be moved into its own module
def _convert_ckpt(checkpoint_path: Path, output_path: Path) -> Path:
"""
Convert checkpoint model to diffusers format.
The converted model will be stored atat output_path.
"""
app_config = InvokeAIAppConfig.get_config()
weights = checkpoint_path
model_info = ModelProbe.probe(checkpoint_path)
base_type = model_info.base_type
variant = model_info.variant_type
pipeline_class = StableDiffusionInpaintPipeline if variant == "inpaint" else StableDiffusionPipeline
config_file = app_config.legacy_conf_path / __select_ckpt_config(base_type, variant)
precision = torch_dtype(choose_torch_device())
model_base_to_model_type = {
BaseModelType.StableDiffusion1: "FrozenCLIPEmbedder",
BaseModelType.StableDiffusion2: "FrozenOpenCLIPEmbedder",
BaseModelType.StableDiffusionXL: "SDXL",
BaseModelType.StableDiffusionXLRefiner: "SDXL-Refiner",
}
convert_ckpt_to_diffusers(
weights.as_posix(),
output_path.as_posix(),
model_type=model_base_to_model_type[base_type],
model_version=base_type,
model_variant=variant,
original_config_file=config_file,
extract_ema=True,
scan_needed=True,
pipeline_class=pipeline_class,
from_safetensors=weights.suffix == ".safetensors",
precision=precision,
)
return output_path
def __select_ckpt_config(version: BaseModelType, variant: ModelVariantType):
ckpt_configs: Dict[BaseModelType, Dict[ModelVariantType, Optional[str]]] = {
BaseModelType.StableDiffusion1: {
ModelVariantType.Normal: "v1-inference.yaml",
ModelVariantType.Inpaint: "v1-inpainting-inference.yaml",
},
BaseModelType.StableDiffusion2: {
ModelVariantType.Normal: "v2-inference-v.yaml", # best guess, as we can't differentiate with base(512)
ModelVariantType.Inpaint: "v2-inpainting-inference.yaml",
ModelVariantType.Depth: "v2-midas-inference.yaml",
},
BaseModelType.StableDiffusionXL: {
ModelVariantType.Normal: "sd_xl_base.yaml",
ModelVariantType.Inpaint: None,
ModelVariantType.Depth: None,
},
BaseModelType.StableDiffusionXLRefiner: {
ModelVariantType.Normal: "sd_xl_refiner.yaml",
ModelVariantType.Inpaint: None,
ModelVariantType.Depth: None,
},
}
return ckpt_configs[version][variant]

View File

@ -140,6 +140,7 @@ dependencies = [
"invokeai-node-web" = "invokeai.app.api_app:invoke_api"
"invokeai-import-images" = "invokeai.frontend.install.import_images:main"
"invokeai-db-maintenance" = "invokeai.backend.util.db_maintenance:main"
"invokeai-nmm" = "invokeai.backend.normalized_mm.cli:main"
[project.urls]
"Homepage" = "https://invoke-ai.github.io/InvokeAI/"