autoimport from embedding/controlnet/lora folders designated in startup file

This commit is contained in:
Lincoln Stein 2023-06-27 12:30:53 -04:00
parent f15d28d141
commit e8ed0fad6c
7 changed files with 172 additions and 123 deletions

View File

@ -374,8 +374,10 @@ setting environment variables INVOKEAI_<setting>.
tiled_decode : bool = Field(default=False, description="Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty)", category='Memory/Performance')
root : Path = Field(default=_find_root(), description='InvokeAI runtime root directory', category='Paths')
autoimport_dir : Path = Field(default='autoimport', description='Path to a directory of models files to be imported on startup.', category='Paths')
autoconvert_dir : Path = Field(default=None, description='Deprecated configuration option.', category='Paths')
autoimport_dir : Path = Field(default='autoimport/main', description='Path to a directory of models files to be imported on startup.', category='Paths')
lora_dir : Path = Field(default='autoimport/lora', description='Path to a directory of LoRA/LyCORIS models to be imported on startup.', category='Paths')
embedding_dir : Path = Field(default='autoimport/embedding', description='Path to a directory of Textual Inversion embeddings to be imported on startup.', category='Paths')
controlnet_dir : Path = Field(default='autoimport/controlnet', description='Path to a directory of ControlNet embeddings to be imported on startup.', category='Paths')
conf_path : Path = Field(default='configs/models.yaml', description='Path to models definition file', category='Paths')
models_dir : Path = Field(default='models', description='Path to the models directory', category='Paths')
legacy_conf_dir : Path = Field(default='configs/stable-diffusion', description='Path to directory of legacy checkpoint config files', category='Paths')

View File

@ -442,6 +442,26 @@ to allow InvokeAI to download restricted styles & subjects from the "Concept Lib
scroll_exit=True,
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.FixedText,
value="Directories containing textual inversion, controlnet and LoRA models (<tab> autocompletes, ctrl-N advances):",
editable=False,
color="CONTROL",
)
self.autoimport_dirs = {}
for description, config_name, path in autoimport_paths(old_opts):
self.autoimport_dirs[config_name] = self.add_widget_intelligent(
npyscreen.TitleFilename,
name=description+':',
value=str(path),
select_dir=True,
must_exist=False,
use_two_lines=False,
labelColor="GOOD",
begin_entry_at=32,
scroll_exit=True
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name="== LICENSE ==",
@ -505,10 +525,6 @@ https://huggingface.co/spaces/CompVis/stable-diffusion-license
bad_fields.append(
f"The output directory does not seem to be valid. Please check that {str(Path(opt.outdir).parent)} is an existing directory."
)
# if not Path(opt.embedding_dir).parent.exists():
# bad_fields.append(
# f"The embedding directory does not seem to be valid. Please check that {str(Path(opt.embedding_dir).parent)} is an existing directory."
# )
if len(bad_fields) > 0:
message = "The following problems were detected and must be corrected:\n"
for problem in bad_fields:
@ -528,12 +544,15 @@ https://huggingface.co/spaces/CompVis/stable-diffusion-license
"max_loaded_models",
"xformers_enabled",
"always_use_cpu",
# "embedding_dir",
# "lora_dir",
# "controlnet_dir",
]:
setattr(new_opts, attr, getattr(self, attr).value)
for attr in self.autoimport_dirs:
directory = Path(self.autoimport_dirs[attr].value)
if directory.is_relative_to(config.root_path):
directory = directory.relative_to(config.root_path)
setattr(new_opts, attr, directory)
new_opts.hf_token = self.hf_token.value
new_opts.license_acceptance = self.license_acceptance.value
new_opts.precision = PRECISION_CHOICES[self.precision.value[0]]
@ -595,22 +614,32 @@ def default_user_selections(program_opts: Namespace) -> InstallSelections:
else [models[x].path or models[x].repo_id for x in installer.recommended_models()]
if program_opts.yes_to_all
else list(),
scan_directory=None,
autoscan_on_startup=None,
# scan_directory=None,
# autoscan_on_startup=None,
)
# -------------------------------------
def autoimport_paths(config: InvokeAIAppConfig):
return [
('Checkpoints & diffusers models', 'autoimport_dir', config.root_path / config.autoimport_dir),
('LoRA/LyCORIS models', 'lora_dir', config.root_path / config.lora_dir),
('Controlnet models', 'controlnet_dir', config.root_path / config.controlnet_dir),
('Textual Inversion Embeddings', 'embedding_dir', config.root_path / config.embedding_dir),
]
# -------------------------------------
def initialize_rootdir(root: Path, yes_to_all: bool = False):
logger.info("** INITIALIZING INVOKEAI RUNTIME DIRECTORY **")
for name in (
"models",
"databases",
"autoimport",
"text-inversion-output",
"text-inversion-training-data",
"configs"
):
os.makedirs(os.path.join(root, name), exist_ok=True)
for model_type in ModelType:
Path(root, 'autoimport', model_type.value).mkdir(parents=True, exist_ok=True)
configs_src = Path(configs.__path__[0])
configs_dest = root / "configs"
@ -618,9 +647,8 @@ def initialize_rootdir(root: Path, yes_to_all: bool = False):
shutil.copytree(configs_src, configs_dest, dirs_exist_ok=True)
dest = root / 'models'
for model_base in [BaseModelType.StableDiffusion1,BaseModelType.StableDiffusion2]:
for model_type in [ModelType.Main, ModelType.Vae, ModelType.Lora,
ModelType.ControlNet,ModelType.TextualInversion]:
for model_base in BaseModelType:
for model_type in ModelType:
path = dest / model_base.value / model_type.value
path.mkdir(parents=True, exist_ok=True)
path = dest / 'core'
@ -632,8 +660,6 @@ def initialize_rootdir(root: Path, yes_to_all: bool = False):
}
)
)
# with open(root / 'invokeai.yaml','w') as f:
# f.write('#empty invokeai.yaml initialization file')
# -------------------------------------
def run_console_ui(

View File

@ -70,8 +70,8 @@ class ModelInstallList:
class InstallSelections():
install_models: List[str]= field(default_factory=list)
remove_models: List[str]=field(default_factory=list)
scan_directory: Path = None
autoscan_on_startup: bool=False
# scan_directory: Path = None
# autoscan_on_startup: bool=False
@dataclass
class ModelLoadInfo():
@ -155,8 +155,8 @@ class ModelInstall(object):
def install(self, selections: InstallSelections):
job = 1
jobs = len(selections.remove_models) + len(selections.install_models)
if selections.scan_directory:
jobs += 1
# if selections.scan_directory:
# jobs += 1
# remove requested models
for key in selections.remove_models:
@ -171,18 +171,8 @@ class ModelInstall(object):
self.heuristic_install(path)
job += 1
# import from the scan directory, if any
if path := selections.scan_directory:
logger.info(f'Scanning and importing models from directory {path} [{job}/{jobs}]')
self.heuristic_install(path)
self.mgr.commit()
if selections.autoscan_on_startup and Path(selections.scan_directory).is_dir():
update_autoimport_dir(selections.scan_directory)
else:
update_autoimport_dir(None)
def heuristic_install(self,
model_path_id_or_url: Union[str,Path],
models_installed: Set[Path]=None)->Set[Path]:
@ -237,7 +227,7 @@ class ModelInstall(object):
self.mgr.add_model(model_name = model_name,
base_model = info.base_type,
model_type = info.model_type,
model_attributes = attributes
model_attributes = attributes,
)
except Exception as e:
logger.warning(f'{str(e)} Skipping registration.')
@ -309,11 +299,11 @@ class ModelInstall(object):
return location.stem
def _make_attributes(self, path: Path, info: ModelProbeInfo)->dict:
# convoluted way to retrieve the description from datasets
description = f'{info.base_type.value} {info.model_type.value} model'
model_name = path.name if path.is_dir() else path.stem
description = f'{info.base_type.value} {info.model_type.value} model {model_name}'
if key := self.reverse_paths.get(self.current_id):
if key in self.datasets:
description = self.datasets[key]['description']
description = self.datasets[key].get('description') or description
rel_path = self.relative_to_root(path)
@ -395,19 +385,6 @@ class ModelInstall(object):
'''
return {v.get('path') or v.get('repo_id') : k for k, v in datasets.items()}
def update_autoimport_dir(autodir: Path):
'''
Update the "autoimport_dir" option in invokeai.yaml
'''
invokeai_config_path = config.init_file_path
conf = OmegaConf.load(invokeai_config_path)
conf.InvokeAI.Paths.autoimport_dir = str(autodir) if autodir else None
yaml = OmegaConf.to_yaml(conf)
tmpfile = invokeai_config_path.parent / "new_config.tmp"
with open(tmpfile, "w", encoding="utf-8") as outfile:
outfile.write(yaml)
tmpfile.replace(invokeai_config_path)
# -------------------------------------
def yes_or_no(prompt: str, default_yes=True):
default = "y" if default_yes else "n"

View File

@ -168,11 +168,27 @@ structure at initialization time by scanning the models directory. The
in-memory data structure can be resynchronized by calling
`manager.scan_models_directory()`.
Files and folders placed inside the `autoimport_dir` (path defined in
`invokeai.yaml`, defaulting to `ROOTDIR/autoimport` will also be
scanned for new models at initialization time and added to
`models.yaml`. Files will not be moved from this location but
preserved in-place.
Files and folders placed inside the `autoimport` paths (paths
defined in `invokeai.yaml`) will also be scanned for new models at
initialization time and added to `models.yaml`. Files will not be
moved from this location but preserved in-place. These directories
are:
configuration default description
------------- ------- -----------
autoimport_dir autoimport/main main models
lora_dir autoimport/lora LoRA/LyCORIS models
embedding_dir autoimport/embedding TI embeddings
controlnet_dir autoimport/controlnet ControlNet models
In actuality, models located in any of these directories are scanned
to determine their type, so it isn't strictly necessary to organize
the different types in this way. This entry in `invokeai.yaml` will
recursively scan all subdirectories within `autoimport`, scan models
files it finds, and import them if recognized.
Paths:
autoimport_dir: autoimport
A model can be manually added using `add_model()` using the model's
name, base model, type and a dict of model attributes. See
@ -208,6 +224,7 @@ checkpoint or safetensors file.
The path points to a file or directory on disk. If a relative path,
the root is the InvokeAI ROOTDIR.
"""
from __future__ import annotations
@ -720,19 +737,27 @@ class ModelManager(object):
)
installed = set()
if not self.app_config.autoimport_dir:
return installed
autodir = self.app_config.root_path / self.app_config.autoimport_dir
if not (autodir and autodir.exists()):
return installed
known_paths = {(self.app_config.root_path / x['path']).resolve() for x in self.list_models()}
config = self.app_config
known_paths = {(self.app_config.root_path / x['path']) for x in self.list_models()}
scanned_dirs = set()
for autodir in [config.autoimport_dir,
config.lora_dir,
config.embedding_dir,
config.controlnet_dir]:
if autodir is None:
continue
autodir = self.app_config.root_path / autodir
if not autodir.exists():
continue
for root, dirs, files in os.walk(autodir):
for d in dirs:
path = Path(root) / d
if path in known_paths:
if path in known_paths or path.parent in scanned_dirs:
scanned_dirs.add(path)
continue
if any([(path/x).exists() for x in {'config.json','model_index.json','learned_embeds.bin'}]):
installed.update(installer.heuristic_install(path))
@ -742,7 +767,7 @@ class ModelManager(object):
path = Path(root) / f
if path in known_paths or path.parent in scanned_dirs:
continue
if path.suffix in {'.ckpt','.bin','.pth','.safetensors'}:
if path.suffix in {'.ckpt','.bin','.pth','.safetensors','.pt'}:
installed.update(installer.heuristic_install(path))
return installed

View File

@ -22,7 +22,7 @@ class ModelProbeInfo(object):
variant_type: ModelVariantType
prediction_type: SchedulerPredictionType
upcast_attention: bool
format: Literal['diffusers','checkpoint']
format: Literal['diffusers','checkpoint', 'lycoris']
image_size: int
class ProbeBase(object):
@ -75,22 +75,23 @@ class ModelProbe(object):
between V2-Base and V2-768 SD models.
'''
if model_path:
format = 'diffusers' if model_path.is_dir() else 'checkpoint'
format_type = 'diffusers' if model_path.is_dir() else 'checkpoint'
else:
format = 'diffusers' if isinstance(model,(ConfigMixin,ModelMixin)) else 'checkpoint'
format_type = 'diffusers' if isinstance(model,(ConfigMixin,ModelMixin)) else 'checkpoint'
model_info = None
try:
model_type = cls.get_model_type_from_folder(model_path, model) \
if format == 'diffusers' \
if format_type == 'diffusers' \
else cls.get_model_type_from_checkpoint(model_path, model)
probe_class = cls.PROBES[format].get(model_type)
probe_class = cls.PROBES[format_type].get(model_type)
if not probe_class:
return None
probe = probe_class(model_path, model, prediction_type_helper)
base_type = probe.get_base_type()
variant_type = probe.get_variant_type()
prediction_type = probe.get_scheduler_prediction_type()
format = probe.get_format()
model_info = ModelProbeInfo(
model_type = model_type,
base_type = base_type,
@ -116,10 +117,10 @@ class ModelProbe(object):
if model_path.name == "learned_embeds.bin":
return ModelType.TextualInversion
checkpoint = checkpoint or read_checkpoint_meta(model_path, scan=True)
checkpoint = checkpoint.get("state_dict", checkpoint)
ckpt = checkpoint if checkpoint else read_checkpoint_meta(model_path, scan=True)
ckpt = ckpt.get("state_dict", ckpt)
for key in checkpoint.keys():
for key in ckpt.keys():
if any(key.startswith(v) for v in {"cond_stage_model.", "first_stage_model.", "model.diffusion_model."}):
return ModelType.Main
elif any(key.startswith(v) for v in {"encoder.conv_in", "decoder.conv_in"}):
@ -133,7 +134,7 @@ class ModelProbe(object):
else:
# diffusers-ti
if len(checkpoint) < 10 and all(isinstance(v, torch.Tensor) for v in checkpoint.values()):
if len(ckpt) < 10 and all(isinstance(v, torch.Tensor) for v in ckpt.values()):
return ModelType.TextualInversion
raise ValueError("Unable to determine model type")
@ -201,6 +202,9 @@ class ProbeBase(object):
def get_scheduler_prediction_type(self)->SchedulerPredictionType:
pass
def get_format(self)->str:
pass
class CheckpointProbeBase(ProbeBase):
def __init__(self,
checkpoint_path: Path,
@ -214,6 +218,9 @@ class CheckpointProbeBase(ProbeBase):
def get_base_type(self)->BaseModelType:
pass
def get_format(self)->str:
return 'checkpoint'
def get_variant_type(self)-> ModelVariantType:
model_type = ModelProbe.get_model_type_from_checkpoint(self.checkpoint_path,self.checkpoint)
if model_type != ModelType.Main:
@ -267,6 +274,9 @@ class VaeCheckpointProbe(CheckpointProbeBase):
return BaseModelType.StableDiffusion1
class LoRACheckpointProbe(CheckpointProbeBase):
def get_format(self)->str:
return 'lycoris'
def get_base_type(self)->BaseModelType:
checkpoint = self.checkpoint
key1 = "lora_te_text_model_encoder_layers_0_mlp_fc1.lora_down.weight"
@ -286,6 +296,9 @@ class LoRACheckpointProbe(CheckpointProbeBase):
return None
class TextualInversionCheckpointProbe(CheckpointProbeBase):
def get_format(self)->str:
return None
def get_base_type(self)->BaseModelType:
checkpoint = self.checkpoint
if 'string_to_token' in checkpoint:
@ -332,6 +345,9 @@ class FolderProbeBase(ProbeBase):
def get_variant_type(self)->ModelVariantType:
return ModelVariantType.Normal
def get_format(self)->str:
return 'diffusers'
class PipelineFolderProbe(FolderProbeBase):
def get_base_type(self)->BaseModelType:
if self.model:
@ -387,6 +403,9 @@ class VaeFolderProbe(FolderProbeBase):
return BaseModelType.StableDiffusion1
class TextualInversionFolderProbe(FolderProbeBase):
def get_format(self)->str:
return None
def get_base_type(self)->BaseModelType:
path = self.folder_path / 'learned_embeds.bin'
if not path.exists():

View File

@ -397,7 +397,7 @@ def read_checkpoint_meta(path: Union[str, Path], scan: bool = False):
checkpoint = safetensors.torch.load_file(path, device="cpu")
else:
if scan:
scan_result = scan_file_path(checkpoint)
scan_result = scan_file_path(path)
if scan_result.infected_files != 0:
raise Exception(f"The model file \"{path}\" is potentially infected by malware. Aborting import.")
checkpoint = torch.load(path, map_location=torch.device("meta"))

View File

@ -131,7 +131,7 @@ class addModelsForm(CyclingForm, npyscreen.FormMultiPage):
window_width=window_width,
exclude = self.starter_models
)
self.pipeline_models['autoload_pending'] = True
# self.pipeline_models['autoload_pending'] = True
bottom_of_table = max(bottom_of_table,self.nextrely)
self.nextrely = top_of_table
@ -316,31 +316,31 @@ class addModelsForm(CyclingForm, npyscreen.FormMultiPage):
**kwargs,
)
label = "Directory to scan for models to automatically import (<tab> autocompletes):"
self.nextrely += 1
widgets.update(
autoload_directory = self.add_widget_intelligent(
FileBox,
max_height=3,
name=label,
value=str(config.root_path / config.autoimport_dir) if config.autoimport_dir else None,
select_dir=True,
must_exist=True,
use_two_lines=False,
labelColor="DANGER",
begin_entry_at=len(label)+1,
scroll_exit=True,
)
)
widgets.update(
autoscan_on_startup = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Scan and import from this directory each time InvokeAI starts",
value=config.autoimport_dir is not None,
relx=4,
scroll_exit=True,
)
)
# label = "Directory to scan for models to automatically import (<tab> autocompletes):"
# self.nextrely += 1
# widgets.update(
# autoload_directory = self.add_widget_intelligent(
# FileBox,
# max_height=3,
# name=label,
# value=str(config.root_path / config.autoimport_dir) if config.autoimport_dir else None,
# select_dir=True,
# must_exist=True,
# use_two_lines=False,
# labelColor="DANGER",
# begin_entry_at=len(label)+1,
# scroll_exit=True,
# )
# )
# widgets.update(
# autoscan_on_startup = self.add_widget_intelligent(
# npyscreen.Checkbox,
# name="Scan and import from this directory each time InvokeAI starts",
# value=config.autoimport_dir is not None,
# relx=4,
# scroll_exit=True,
# )
# )
return widgets
def resize(self):
@ -501,8 +501,8 @@ class addModelsForm(CyclingForm, npyscreen.FormMultiPage):
# rebuild the form, saving and restoring some of the fields that need to be preserved.
saved_messages = self.monitor.entry_widget.values
autoload_dir = str(config.root_path / self.pipeline_models['autoload_directory'].value)
autoscan = self.pipeline_models['autoscan_on_startup'].value
# autoload_dir = str(config.root_path / self.pipeline_models['autoload_directory'].value)
# autoscan = self.pipeline_models['autoscan_on_startup'].value
app.main_form = app.addForm(
"MAIN", addModelsForm, name="Install Stable Diffusion Models", multipage=self.multipage,
@ -511,8 +511,8 @@ class addModelsForm(CyclingForm, npyscreen.FormMultiPage):
app.main_form.monitor.entry_widget.values = saved_messages
app.main_form.monitor.entry_widget.buffer([''],scroll_end=True)
app.main_form.pipeline_models['autoload_directory'].value = autoload_dir
app.main_form.pipeline_models['autoscan_on_startup'].value = autoscan
# app.main_form.pipeline_models['autoload_directory'].value = autoload_dir
# app.main_form.pipeline_models['autoscan_on_startup'].value = autoscan
def marshall_arguments(self):
"""
@ -546,17 +546,17 @@ class addModelsForm(CyclingForm, npyscreen.FormMultiPage):
selections.install_models.extend(downloads.value.split())
# load directory and whether to scan on startup
if self.parentApp.autoload_pending:
selections.scan_directory = str(config.root_path / self.pipeline_models['autoload_directory'].value)
self.parentApp.autoload_pending = False
selections.autoscan_on_startup = self.pipeline_models['autoscan_on_startup'].value
# if self.parentApp.autoload_pending:
# selections.scan_directory = str(config.root_path / self.pipeline_models['autoload_directory'].value)
# self.parentApp.autoload_pending = False
# selections.autoscan_on_startup = self.pipeline_models['autoscan_on_startup'].value
class AddModelApplication(npyscreen.NPSAppManaged):
def __init__(self,opt):
super().__init__()
self.program_opts = opt
self.user_cancelled = False
self.autoload_pending = True
# self.autoload_pending = True
self.install_selections = InstallSelections()
def onStart(self):