fix(mm): fix a lot of typing issues

Most fixes are just things being typed as `str` but having default values of `None`, but there are some minor logic changes.
This commit is contained in:
psychedelicious 2023-08-01 17:55:13 +10:00
parent 9ba50130a1
commit 66f524cae7
4 changed files with 36 additions and 29 deletions

View File

@ -3,9 +3,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from logging import Logger
from pathlib import Path
from pydantic import Field
from typing import Optional, Union, Callable, List, Tuple, TYPE_CHECKING
from typing import Literal, Optional, Union, Callable, List, Tuple, TYPE_CHECKING
from types import ModuleType
from invokeai.backend.model_management import (
@ -193,7 +194,7 @@ class ModelManagerServiceBase(ABC):
self,
model_name: str,
base_model: BaseModelType,
model_type: Union[ModelType.Main, ModelType.Vae],
model_type: Literal[ModelType.Main, ModelType.Vae],
) -> AddModelResult:
"""
Convert a checkpoint file into a diffusers folder, deleting the cached
@ -292,7 +293,7 @@ class ModelManagerService(ModelManagerServiceBase):
def __init__(
self,
config: InvokeAIAppConfig,
logger: ModuleType,
logger: Logger,
):
"""
Initialize with the path to the models.yaml config file.
@ -396,7 +397,7 @@ class ModelManagerService(ModelManagerServiceBase):
model_type,
)
def model_info(self, model_name: str, base_model: BaseModelType, model_type: ModelType) -> dict:
def model_info(self, model_name: str, base_model: BaseModelType, model_type: ModelType) -> Union[dict, None]:
"""
Given a model name returns a dict-like (OmegaConf) object describing it.
"""
@ -416,7 +417,7 @@ class ModelManagerService(ModelManagerServiceBase):
"""
return self.mgr.list_models(base_model, model_type)
def list_model(self, model_name: str, base_model: BaseModelType, model_type: ModelType) -> dict:
def list_model(self, model_name: str, base_model: BaseModelType, model_type: ModelType) -> Union[dict, None]:
"""
Return information about the model using the same format as list_models()
"""
@ -429,7 +430,7 @@ class ModelManagerService(ModelManagerServiceBase):
model_type: ModelType,
model_attributes: dict,
clobber: bool = False,
) -> None:
) -> AddModelResult:
"""
Update the named model with a dictionary of attributes. Will fail with an
assertion error if the name already exists. Pass clobber=True to overwrite.
@ -478,7 +479,7 @@ class ModelManagerService(ModelManagerServiceBase):
self,
model_name: str,
base_model: BaseModelType,
model_type: Union[ModelType.Main, ModelType.Vae],
model_type: Literal[ModelType.Main, ModelType.Vae],
convert_dest_directory: Optional[Path] = Field(
default=None, description="Optional directory location for merged model"
),
@ -573,9 +574,9 @@ class ModelManagerService(ModelManagerServiceBase):
default=None, description="Base model shared by all models to be merged"
),
merged_model_name: str = Field(default=None, description="Name of destination model after merging"),
alpha: Optional[float] = 0.5,
alpha: float = 0.5,
interp: Optional[MergeInterpolationMethod] = None,
force: Optional[bool] = False,
force: bool = False,
merge_dest_directory: Optional[Path] = Field(
default=None, description="Optional directory location for merged model"
),
@ -633,8 +634,8 @@ class ModelManagerService(ModelManagerServiceBase):
model_name: str,
base_model: BaseModelType,
model_type: ModelType,
new_name: str = None,
new_base: BaseModelType = None,
new_name: Optional[str] = None,
new_base: Optional[BaseModelType] = None,
):
"""
Rename the indicated model. Can provide a new name and/or a new base.

View File

@ -101,9 +101,9 @@ class ModelInstall(object):
def __init__(
self,
config: InvokeAIAppConfig,
prediction_type_helper: Callable[[Path], SchedulerPredictionType] = None,
model_manager: ModelManager = None,
access_token: str = None,
prediction_type_helper: Optional[Callable[[Path], SchedulerPredictionType]] = None,
model_manager: Optional[ModelManager] = None,
access_token: Optional[str] = None,
):
self.config = config
self.mgr = model_manager or ModelManager(config.model_conf_path)

View File

@ -234,7 +234,7 @@ import textwrap
import yaml
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, List, Tuple, Union, Dict, Set, Callable, types
from typing import Literal, Optional, List, Tuple, Union, Dict, Set, Callable, types
from shutil import rmtree, move
import torch
@ -518,7 +518,7 @@ class ModelManager(object):
model_name: str,
base_model: BaseModelType,
model_type: ModelType,
) -> dict:
) -> Union[dict, None]:
"""
Given a model name returns the OmegaConf (dict-like) object describing it.
"""
@ -540,13 +540,15 @@ class ModelManager(object):
model_name: str,
base_model: BaseModelType,
model_type: ModelType,
) -> dict:
) -> Union[dict, None]:
"""
Returns a dict describing one installed model, using
the combined format of the list_models() method.
"""
models = self.list_models(base_model, model_type, model_name)
return models[0] if models else None
if len(models) > 1:
return models[0]
return None
def list_models(
self,
@ -560,7 +562,7 @@ class ModelManager(object):
model_keys = (
[self.create_key(model_name, base_model, model_type)]
if model_name
if model_name and base_model and model_type
else sorted(self.models, key=str.casefold)
)
models = []
@ -596,7 +598,7 @@ class ModelManager(object):
Print a table of models and their descriptions. This needs to be redone
"""
# TODO: redo
for model_type, model_dict in self.list_models().items():
for model_dict in self.list_models():
for model_name, model_info in model_dict.items():
line = f'{model_info["name"]:25s} {model_info["type"]:10s} {model_info["description"]}'
print(line)
@ -699,8 +701,8 @@ class ModelManager(object):
model_name: str,
base_model: BaseModelType,
model_type: ModelType,
new_name: str = None,
new_base: BaseModelType = None,
new_name: Optional[str] = None,
new_base: Optional[BaseModelType] = None,
):
"""
Rename or rebase a model.
@ -753,7 +755,7 @@ class ModelManager(object):
self,
model_name: str,
base_model: BaseModelType,
model_type: Union[ModelType.Main, ModelType.Vae],
model_type: Literal[ModelType.Main, ModelType.Vae],
dest_directory: Optional[Path] = None,
) -> AddModelResult:
"""
@ -767,6 +769,10 @@ class ModelManager(object):
This will raise a ValueError unless the model is a checkpoint.
"""
info = self.model_info(model_name, base_model, model_type)
if info is None:
raise FileNotFoundError(f"model not found: {model_name}")
if info["model_format"] != "checkpoint":
raise ValueError(f"not a checkpoint format model: {model_name}")
@ -836,7 +842,7 @@ class ModelManager(object):
return search_folder, found_models
def commit(self, conf_file: Path = None) -> None:
def commit(self, conf_file: Optional[Path] = None) -> None:
"""
Write current configuration out to the indicated file.
"""
@ -983,7 +989,7 @@ class ModelManager(object):
# LS: hacky
# Patch in the SD VAE from core so that it is available for use by the UI
try:
self.heuristic_import({self.resolve_model_path("core/convert/sd-vae-ft-mse")})
self.heuristic_import({str(self.resolve_model_path("core/convert/sd-vae-ft-mse"))})
except:
pass
@ -1011,7 +1017,7 @@ class ModelManager(object):
def heuristic_import(
self,
items_to_import: Set[str],
prediction_type_helper: Callable[[Path], SchedulerPredictionType] = None,
prediction_type_helper: Optional[Callable[[Path], SchedulerPredictionType]] = None,
) -> Dict[str, AddModelResult]:
"""Import a list of paths, repo_ids or URLs. Returns the set of
successfully imported items.

View File

@ -33,7 +33,7 @@ class ModelMerger(object):
self,
model_paths: List[Path],
alpha: float = 0.5,
interp: MergeInterpolationMethod = None,
interp: Optional[MergeInterpolationMethod] = None,
force: bool = False,
**kwargs,
) -> DiffusionPipeline:
@ -73,7 +73,7 @@ class ModelMerger(object):
base_model: Union[BaseModelType, str],
merged_model_name: str,
alpha: float = 0.5,
interp: MergeInterpolationMethod = None,
interp: Optional[MergeInterpolationMethod] = None,
force: bool = False,
merge_dest_directory: Optional[Path] = None,
**kwargs,
@ -122,7 +122,7 @@ class ModelMerger(object):
dump_path.mkdir(parents=True, exist_ok=True)
dump_path = dump_path / merged_model_name
merged_pipe.save_pretrained(dump_path, safe_serialization=1)
merged_pipe.save_pretrained(dump_path, safe_serialization=True)
attributes = dict(
path=str(dump_path),
description=f"Merge of models {', '.join(model_names)}",