reformat with black and isort

This commit is contained in:
Lincoln Stein 2023-02-21 14:12:57 -05:00
parent 4878c7a2d5
commit 5a4967582e
3 changed files with 307 additions and 225 deletions

View File

@ -177,6 +177,7 @@ def download_with_progress_bar(model_url: str, model_dest: str, label: str = "th
print(f"Error downloading {label} model", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
# ---------------------------------------------
# this will preload the Bert tokenizer fles
def download_bert():
@ -284,37 +285,36 @@ def download_safety_checker():
download_from_hf(StableDiffusionSafetyChecker, safety_model_id)
print("...success", file=sys.stderr)
# -------------------------------------
def download_vaes(precision: str):
print("Installing stabilityai VAE...", file=sys.stderr)
try:
# first the diffusers version
repo_id = 'stabilityai/sd-vae-ft-mse'
repo_id = "stabilityai/sd-vae-ft-mse"
args = dict(
cache_dir=global_cache_dir('diffusers'),
cache_dir=global_cache_dir("diffusers"),
)
if precision=='float16':
args.update(
torch_dtype=torch.float16,
revision='fp16'
)
if precision == "float16":
args.update(torch_dtype=torch.float16, revision="fp16")
if not AutoencoderKL.from_pretrained(repo_id, **args):
raise Exception(f'download of {repo_id} failed')
raise Exception(f"download of {repo_id} failed")
repo_id = 'stabilityai/sd-vae-ft-mse-original'
model_name = 'vae-ft-mse-840000-ema-pruned.ckpt'
repo_id = "stabilityai/sd-vae-ft-mse-original"
model_name = "vae-ft-mse-840000-ema-pruned.ckpt"
# next the legacy checkpoint version
if not hf_download_with_resume(
repo_id = repo_id,
model_name = model_name,
model_dir = str(Globals.root / Model_dir / Weights_dir)
repo_id=repo_id,
model_name=model_name,
model_dir=str(Globals.root / Model_dir / Weights_dir),
):
raise Exception(f'download of {model_name} failed')
raise Exception(f"download of {model_name} failed")
print("...downloaded successfully", file=sys.stderr)
except Exception as e:
print(f"Error downloading StabilityAI standard VAE: {str(e)}", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
# -------------------------------------
def get_root(root: str = None) -> str:
if root:
@ -329,7 +329,7 @@ class editOptsForm(npyscreen.FormMultiPage):
def create(self):
program_opts = self.parentApp.program_opts
old_opts = self.parentApp.invokeai_opts
with open('log.txt','w') as f:
with open("log.txt", "w") as f:
f.write(str(old_opts))
first_time = not (Globals.root / Globals.initfile).exists()
access_token = HfFolder.get_token()
@ -576,14 +576,14 @@ class editOptsForm(npyscreen.FormMultiPage):
new_opts = Namespace()
for attr in [
"outdir",
"safety_checker",
"free_gpu_mem",
"max_loaded_models",
"xformers",
"always_use_cpu",
"embedding_path",
"ckpt_convert",
"outdir",
"safety_checker",
"free_gpu_mem",
"max_loaded_models",
"xformers",
"always_use_cpu",
"embedding_path",
"ckpt_convert",
]:
setattr(new_opts, attr, getattr(self, attr).value)
@ -672,7 +672,9 @@ def initialize_rootdir(root: str, yes_to_all: bool = False):
# -------------------------------------
def run_console_ui(program_opts: Namespace, initfile: Path=None) -> (Namespace, Namespace):
def run_console_ui(
program_opts: Namespace, initfile: Path = None
) -> (Namespace, Namespace):
# parse_args() will read from init file if present
invokeai_opts = default_startup_options(initfile)
editApp = EditOptApplication(program_opts, invokeai_opts)
@ -747,6 +749,7 @@ def write_default_options(program_opts: Namespace, initfile: Path):
opt.hf_token = HfFolder.get_token()
write_opts(opt, initfile)
# -------------------------------------
def main():
parser = argparse.ArgumentParser(description="InvokeAI model downloader")
@ -816,7 +819,9 @@ def main():
if opt.yes_to_all:
write_default_options(opt, init_file)
init_options = Namespace(precision='float32' if opt.full_precision else 'float16')
init_options = Namespace(
precision="float32" if opt.full_precision else "float16"
)
else:
init_options, models_to_download = run_console_ui(opt, init_file)
if init_options:

View File

@ -4,10 +4,10 @@
# run this script from one with internet connectivity. The
# two machines must share a common .cache directory.
'''
"""
This is the npyscreen frontend to the model installation application.
The work is actually done in backend code in model_install_backend.py.
'''
"""
import argparse
import curses
@ -15,25 +15,26 @@ import os
import sys
import traceback
from argparse import Namespace
from pathlib import Path
from typing import List
import npyscreen
import torch
from pathlib import Path
from npyscreen import widget
from omegaconf import OmegaConf
from ..devices import choose_precision, choose_torch_device
from ..globals import Globals, global_config_dir
from .widgets import MultiSelectColumns, TextBox
from .model_install_backend import (Dataset_path, default_config_file,
default_dataset, get_root,
install_requested_models,
default_dataset, recommended_datasets, get_root
)
recommended_datasets)
from .widgets import MultiSelectColumns, TextBox
class addModelsForm(npyscreen.FormMultiPage):
def __init__(self, parentApp, name, multipage=False, *args, **keywords):
self.multipage=multipage
self.multipage = multipage
self.initial_models = OmegaConf.load(Dataset_path)
try:
self.existing_models = OmegaConf.load(default_config_file())
@ -42,7 +43,7 @@ class addModelsForm(npyscreen.FormMultiPage):
self.starter_model_list = [
x for x in list(self.initial_models.keys()) if x not in self.existing_models
]
self.installed_models=dict()
self.installed_models = dict()
super().__init__(parentApp=parentApp, name=name, *args, **keywords)
def create(self):
@ -54,19 +55,17 @@ class addModelsForm(npyscreen.FormMultiPage):
if self.initial_models[x].get("recommended", False)
]
self.installed_models = sorted(
[
x for x in list(self.initial_models.keys()) if x in self.existing_models
]
[x for x in list(self.initial_models.keys()) if x in self.existing_models]
)
self.nextrely -= 1
self.add_widget_intelligent(
npyscreen.FixedText,
value='Use ctrl-N and ctrl-P to move to the <N>ext and <P>revious fields,',
value="Use ctrl-N and ctrl-P to move to the <N>ext and <P>revious fields,",
editable=False,
)
self.add_widget_intelligent(
npyscreen.FixedText,
value='cursor arrows to make a selection, and space to toggle checkboxes.',
value="cursor arrows to make a selection, and space to toggle checkboxes.",
editable=False,
)
self.nextrely += 1
@ -84,17 +83,17 @@ class addModelsForm(npyscreen.FormMultiPage):
MultiSelectColumns,
columns=columns,
values=self.installed_models,
value=[x for x in range(0,len(self.installed_models))],
max_height=1+len(self.installed_models) // columns,
relx = 4,
value=[x for x in range(0, len(self.installed_models))],
max_height=1 + len(self.installed_models) // columns,
relx=4,
slow_scroll=True,
scroll_exit = True,
scroll_exit=True,
)
self.purge_deleted = self.add_widget_intelligent(
npyscreen.Checkbox,
name='Purge deleted models from disk',
name="Purge deleted models from disk",
value=False,
scroll_exit=True
scroll_exit=True,
)
self.nextrely += 1
self.add_widget_intelligent(
@ -119,13 +118,13 @@ class addModelsForm(npyscreen.FormMultiPage):
if show_recommended and x in recommended_models
],
max_height=len(starter_model_labels) + 1,
relx = 4,
relx=4,
scroll_exit=True,
)
for line in [
'== IMPORT LOCAL AND REMOTE MODELS ==',
'Enter URLs, file paths, or HuggingFace diffusers repository IDs separated by spaces.',
'Use control-V or shift-control-V to paste:'
"== IMPORT LOCAL AND REMOTE MODELS ==",
"Enter URLs, file paths, or HuggingFace diffusers repository IDs separated by spaces.",
"Use control-V or shift-control-V to paste:",
]:
self.add_widget_intelligent(
npyscreen.TitleText,
@ -135,40 +134,36 @@ class addModelsForm(npyscreen.FormMultiPage):
)
self.nextrely -= 1
self.import_model_paths = self.add_widget_intelligent(
TextBox,
max_height=5,
scroll_exit=True,
editable=True,
relx=4
TextBox, max_height=5, scroll_exit=True, editable=True, relx=4
)
self.nextrely += 1
self.show_directory_fields= self.add_widget_intelligent(
self.show_directory_fields = self.add_widget_intelligent(
npyscreen.FormControlCheckbox,
name='Select a directory for models to import',
name="Select a directory for models to import",
value=False,
)
self.autoload_directory = self.add_widget_intelligent(
npyscreen.TitleFilename,
name='Directory (<tab> autocompletes):',
name="Directory (<tab> autocompletes):",
select_dir=True,
must_exist=True,
use_two_lines=False,
labelColor='DANGER',
labelColor="DANGER",
begin_entry_at=34,
scroll_exit=True,
)
self.autoscan_on_startup = self.add_widget_intelligent(
npyscreen.Checkbox,
name='Scan this directory each time InvokeAI starts for new models to import',
name="Scan this directory each time InvokeAI starts for new models to import",
value=False,
relx = 4,
relx=4,
scroll_exit=True,
)
self.nextrely += 1
self.convert_models = self.add_widget_intelligent(
npyscreen.TitleSelectOne,
name='== CONVERT IMPORTED MODELS INTO DIFFUSERS==',
values=['Keep original format','Convert to diffusers'],
name="== CONVERT IMPORTED MODELS INTO DIFFUSERS==",
values=["Keep original format", "Convert to diffusers"],
value=0,
begin_entry_at=4,
max_height=4,
@ -177,33 +172,33 @@ class addModelsForm(npyscreen.FormMultiPage):
)
self.cancel = self.add_widget_intelligent(
npyscreen.ButtonPress,
name='CANCEL',
rely = -3,
name="CANCEL",
rely=-3,
when_pressed_function=self.on_cancel,
)
done_label = 'DONE'
back_label = 'BACK'
done_label = "DONE"
back_label = "BACK"
button_length = len(done_label)
button_offset = 0
if self.multipage:
button_length += len(back_label)+1
button_offset += len(back_label)+1
button_length += len(back_label) + 1
button_offset += len(back_label) + 1
self.back_button = self.add_widget_intelligent(
npyscreen.ButtonPress,
name=back_label,
relx= (window_width-button_length)//2,
rely= -3,
when_pressed_function=self.on_back
relx=(window_width - button_length) // 2,
rely=-3,
when_pressed_function=self.on_back,
)
self.ok_button = self.add_widget_intelligent(
npyscreen.ButtonPress,
name=done_label,
relx= button_offset + 1 + (window_width-button_length)//2,
rely= -3,
when_pressed_function=self.on_ok
relx=button_offset + 1 + (window_width - button_length) // 2,
rely=-3,
when_pressed_function=self.on_ok,
)
for i in [self.autoload_directory,self.autoscan_on_startup]:
for i in [self.autoload_directory, self.autoscan_on_startup]:
self.show_directory_fields.addVisibleWhenSelected(i)
self.show_directory_fields.when_value_edited = self._clear_scan_directory
@ -216,14 +211,16 @@ class addModelsForm(npyscreen.FormMultiPage):
def _clear_scan_directory(self):
if not self.show_directory_fields.value:
self.autoload_directory.value = ''
self.autoload_directory.value = ""
def _show_hide_convert(self):
model_paths = self.import_model_paths.value or ''
autoload_directory = self.autoload_directory.value or ''
self.convert_models.hidden = len(model_paths)==0 and len(autoload_directory)==0
def _get_starter_model_labels(self)->List[str]:
model_paths = self.import_model_paths.value or ""
autoload_directory = self.autoload_directory.value or ""
self.convert_models.hidden = (
len(model_paths) == 0 and len(autoload_directory) == 0
)
def _get_starter_model_labels(self) -> List[str]:
window_height, window_width = curses.initscr().getmaxyx()
label_width = 25
checkbox_width = 4
@ -231,18 +228,29 @@ class addModelsForm(npyscreen.FormMultiPage):
description_width = window_width - label_width - checkbox_width - spacing_width
im = self.initial_models
names = self.starter_model_list
descriptions = [im[x].description [0:description_width-3]+'...'
if len(im[x].description) > description_width
else im[x].description
for x in names]
descriptions = [
im[x].description[0 : description_width - 3] + "..."
if len(im[x].description) > description_width
else im[x].description
for x in names
]
return [
f"%-{label_width}s %s" % (names[x], descriptions[x]) for x in range(0,len(names))
f"%-{label_width}s %s" % (names[x], descriptions[x])
for x in range(0, len(names))
]
def _get_columns(self)->int:
def _get_columns(self) -> int:
window_height, window_width = curses.initscr().getmaxyx()
cols = 4 if window_width > 240 else 3 if window_width>160 else 2 if window_width>80 else 1
return min(cols,len(self.installed_models))
cols = (
4
if window_width > 240
else 3
if window_width > 160
else 2
if window_width > 80
else 1
)
return min(cols, len(self.installed_models))
def on_ok(self):
self.parentApp.setNextForm(None)
@ -256,14 +264,14 @@ class addModelsForm(npyscreen.FormMultiPage):
def on_cancel(self):
if npyscreen.notify_yes_no(
'Are you sure you want to cancel?\nYou may re-run this script later using the invoke.sh or invoke.bat command.\n'
"Are you sure you want to cancel?\nYou may re-run this script later using the invoke.sh or invoke.bat command.\n"
):
self.parentApp.setNextForm(None)
self.parentApp.user_cancelled = True
self.editing = False
def marshall_arguments(self):
'''
"""
Assemble arguments and store as attributes of the application:
.starter_models: dict of model names to install from INITIAL_CONFIGURE.yaml
True => Install
@ -272,25 +280,27 @@ class addModelsForm(npyscreen.FormMultiPage):
.autoscan_on_startup: True if invokeai should scan and import at startup time
.import_model_paths: list of URLs, repo_ids and file paths to import
.convert_to_diffusers: if True, convert legacy checkpoints into diffusers
'''
"""
# we're using a global here rather than storing the result in the parentapp
# due to some bug in npyscreen that is causing attributes to be lost
selections = self.parentApp.user_selections
# starter models to install/remove
starter_models = dict(map(lambda x: (self.starter_model_list[x], True), self.models_selected.value))
selections.purge_deleted_models=False
if hasattr(self,'previously_installed_models'):
starter_models = dict(
map(
lambda x: (self.starter_model_list[x], True), self.models_selected.value
)
)
selections.purge_deleted_models = False
if hasattr(self, "previously_installed_models"):
unchecked = [
self.previously_installed_models.values[x]
for x in range(0,len(self.previously_installed_models.values))
for x in range(0, len(self.previously_installed_models.values))
if x not in self.previously_installed_models.value
]
starter_models.update(
map(lambda x: (x, False), unchecked)
)
starter_models.update(map(lambda x: (x, False), unchecked))
selections.purge_deleted_models = self.purge_deleted.value
selections.starter_models=starter_models
selections.starter_models = starter_models
# load directory and whether to scan on startup
if self.show_directory_fields.value:
@ -303,61 +313,72 @@ class addModelsForm(npyscreen.FormMultiPage):
# URLs and the like
selections.import_model_paths = self.import_model_paths.value.split()
selections.convert_to_diffusers = self.convert_models.value[0] == 1
class AddModelApplication(npyscreen.NPSAppManaged):
def __init__(self):
super().__init__()
self.user_cancelled = False
self.user_selections = Namespace(
starter_models = None,
purge_deleted_models = False,
scan_directory = None,
autoscan_on_startup = None,
import_model_paths = None,
convert_to_diffusers = None
starter_models=None,
purge_deleted_models=False,
scan_directory=None,
autoscan_on_startup=None,
import_model_paths=None,
convert_to_diffusers=None,
)
def onStart(self):
npyscreen.setTheme(npyscreen.Themes.DefaultTheme)
self.main_form = self.addForm(
"MAIN",
addModelsForm,
name="Install Stable Diffusion Models"
"MAIN", addModelsForm, name="Install Stable Diffusion Models"
)
# --------------------------------------------------------
def process_and_execute(opt: Namespace, selections: Namespace):
models_to_remove = [x for x in selections.starter_models if not selections.starter_models[x]]
models_to_install = [x for x in selections.starter_models if selections.starter_models[x]]
models_to_remove = [
x for x in selections.starter_models if not selections.starter_models[x]
]
models_to_install = [
x for x in selections.starter_models if selections.starter_models[x]
]
directory_to_scan = selections.scan_directory
scan_at_startup = selections.autoscan_on_startup
potential_models_to_install = selections.import_model_paths
convert_to_diffusers = selections.convert_to_diffusers
install_requested_models(
install_initial_models = models_to_install,
remove_models = models_to_remove,
scan_directory = Path(directory_to_scan) if directory_to_scan else None,
external_models = potential_models_to_install,
scan_at_startup = scan_at_startup,
convert_to_diffusers = convert_to_diffusers,
precision = 'float32' if opt.full_precision else choose_precision(torch.device(choose_torch_device())),
purge_deleted = selections.purge_deleted_models,
config_file_path = Path(opt.config_file) if opt.config_file else None,
install_initial_models=models_to_install,
remove_models=models_to_remove,
scan_directory=Path(directory_to_scan) if directory_to_scan else None,
external_models=potential_models_to_install,
scan_at_startup=scan_at_startup,
convert_to_diffusers=convert_to_diffusers,
precision="float32"
if opt.full_precision
else choose_precision(torch.device(choose_torch_device())),
purge_deleted=selections.purge_deleted_models,
config_file_path=Path(opt.config_file) if opt.config_file else None,
)
# --------------------------------------------------------
def select_and_download_models(opt: Namespace):
precision= 'float32' if opt.full_precision else choose_precision(torch.device(choose_torch_device()))
precision = (
"float32"
if opt.full_precision
else choose_precision(torch.device(choose_torch_device()))
)
if opt.default_only:
install_requested_models(
install_initial_models = default_dataset(),
precision = precision,
install_initial_models=default_dataset(),
precision=precision,
)
elif opt.yes_to_all:
install_requested_models(
install_initial_models = recommended_datasets(),
precision = precision,
install_initial_models=recommended_datasets(),
precision=precision,
)
else:
installApp = AddModelApplication()
@ -366,6 +387,7 @@ def select_and_download_models(opt: Namespace):
if not installApp.user_cancelled:
process_and_execute(opt, installApp.user_selections)
# -------------------------------------
def main():
parser = argparse.ArgumentParser(description="InvokeAI model downloader")
@ -410,8 +432,11 @@ def main():
Globals.root = os.path.expanduser(get_root(opt.root) or "")
if not global_config_dir().exists():
print('>> Your InvokeAI root directory is not set up. Calling invokeai-configure.')
print(
">> Your InvokeAI root directory is not set up. Calling invokeai-configure."
)
import ldm.invoke.config.invokeai_configure
ldm.invoke.config.invokeai_configure.main()
sys.exit(0)
@ -427,15 +452,16 @@ def main():
print(
"** Insufficient vertical space for the interface. Please make your window taller and try again"
)
elif str(e).startswith('addwstr'):
elif str(e).startswith("addwstr"):
print(
'** Insufficient horizontal space for the interface. Please make your window wider and try again.'
"** Insufficient horizontal space for the interface. Please make your window wider and try again."
)
else:
print(f"** An error has occurred: {str(e)}")
traceback.print_exc()
sys.exit(-1)
# -------------------------------------
if __name__ == "__main__":
main()

View File

@ -33,17 +33,22 @@ from omegaconf.dictconfig import DictConfig
from picklescan.scanner import scan_file_path
from ldm.invoke.devices import CPU_DEVICE
from ldm.invoke.generator.diffusers_pipeline import \
StableDiffusionGeneratorPipeline
from ldm.invoke.globals import (Globals, global_cache_dir)
from ldm.util import (ask_user, download_with_resume,
url_attachment_name, instantiate_from_config)
from ldm.invoke.generator.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ldm.invoke.globals import Globals, global_cache_dir
from ldm.util import (
ask_user,
download_with_resume,
instantiate_from_config,
url_attachment_name,
)
class SDLegacyType(Enum):
V1 = 1
V1 = 1
V1_INPAINT = 2
V2 = 3
UNKNOWN = 99
V2 = 3
UNKNOWN = 99
DEFAULT_MAX_MODELS = 2
VAE_TO_REPO_ID = { # hack, see note in convert_and_import()
@ -58,7 +63,7 @@ class ModelManager(object):
device_type: torch.device = CPU_DEVICE,
precision: str = "float16",
max_loaded_models=DEFAULT_MAX_MODELS,
sequential_offload = False
sequential_offload=False,
):
"""
Initialize with the path to the models.yaml config file,
@ -386,6 +391,7 @@ class ModelManager(object):
from ldm.invoke.ckpt_to_diffuser import (
load_pipeline_from_original_stable_diffusion_ckpt,
)
self.offload_model(self.current_model)
if vae_config := self._choose_diffusers_vae(model_name):
vae = self._load_vae(vae_config)
@ -396,13 +402,15 @@ class ModelManager(object):
original_config_file=config,
vae=vae,
return_generator_pipeline=True,
precision=torch.float16 if self.precision=='float16' else torch.float32,
precision=torch.float16
if self.precision == "float16"
else torch.float32,
)
if self.sequential_offload:
pipeline.enable_offload_submodels(self.device)
else:
pipeline.to(self.device)
return (
pipeline,
width,
@ -615,12 +623,12 @@ class ModelManager(object):
print(">> Model scanned ok")
def import_diffuser_model(
self,
repo_or_path: Union[str, Path],
model_name: str = None,
model_description: str = None,
vae: dict = None,
commit_to_conf: Path = None,
self,
repo_or_path: Union[str, Path],
model_name: str = None,
model_description: str = None,
vae: dict = None,
commit_to_conf: Path = None,
) -> bool:
"""
Attempts to install the indicated diffuser model and returns True if successful.
@ -640,15 +648,15 @@ class ModelManager(object):
vae=vae,
format="diffusers",
)
print(f'DEBUG: here i am 1')
print(f"DEBUG: here i am 1")
if isinstance(repo_or_path, Path) and repo_or_path.exists():
new_config.update(path=str(repo_or_path))
else:
new_config.update(repo_id=repo_or_path)
print(f'DEBUG: here i am 2')
print(f"DEBUG: here i am 2")
self.add_model(model_name, new_config, True)
print(f'DEBUG: config = {self.config}')
print(f"DEBUG: config = {self.config}")
if commit_to_conf:
self.commit(commit_to_conf)
return model_name
@ -685,14 +693,16 @@ class ModelManager(object):
model_name = model_name or url_attachment_name(weights)
weights_path = self._resolve_path(weights, "models/ldm/stable-diffusion-v1")
config_path = self._resolve_path(config, "configs/stable-diffusion")
config_path = self._resolve_path(config, "configs/stable-diffusion")
if weights_path is None or not weights_path.exists():
return
if config_path is None or not config_path.exists():
return
model_name = model_name or Path(weights).stem # note this gives ugly pathnames if used on a URL without a Content-Disposition header
model_name = (
model_name or Path(weights).stem
) # note this gives ugly pathnames if used on a URL without a Content-Disposition header
model_description = (
model_description or f"imported stable diffusion weights file {model_name}"
)
@ -712,8 +722,8 @@ class ModelManager(object):
return model_name
@classmethod
def probe_model_type(self, checkpoint: dict)->SDLegacyType:
'''
def probe_model_type(self, checkpoint: dict) -> SDLegacyType:
"""
Given a pickle or safetensors model object, probes contents
of the object and returns an SDLegacyType indicating its
format. Valid return values include:
@ -721,14 +731,16 @@ class ModelManager(object):
SDLegacyType.V1_INPAINT
SDLegacyType.V2
UNKNOWN
'''
"""
key_name = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
if key_name in checkpoint and checkpoint[key_name].shape[-1] == 1024:
return SDLegacyType.V2
try:
state_dict = checkpoint.get('state_dict') or checkpoint
in_channels = state_dict['model.diffusion_model.input_blocks.0.0.weight'].shape[1]
state_dict = checkpoint.get("state_dict") or checkpoint
in_channels = state_dict[
"model.diffusion_model.input_blocks.0.0.weight"
].shape[1]
if in_channels == 9:
return SDLegacyType.V1_INPAINT
elif in_channels == 4:
@ -739,15 +751,15 @@ class ModelManager(object):
return SDLegacyType.UNKNOWN
def heuristic_import(
self,
path_url_or_repo: str,
convert: bool= False,
model_name: str = None,
description: str = None,
commit_to_conf: Path=None,
)->str:
'''
Accept a string which could be:
self,
path_url_or_repo: str,
convert: bool = False,
model_name: str = None,
description: str = None,
commit_to_conf: Path = None,
) -> str:
"""
Accept a string which could be:
- a HF diffusers repo_id
- a URL pointing to a legacy .ckpt or .safetensors file
- a local path pointing to a legacy .ckpt or .safetensors file
@ -771,88 +783,119 @@ class ModelManager(object):
The (potentially derived) name of the model is returned on success, or None
on failure. When multiple models are added from a directory, only the last
imported one is returned.
'''
"""
model_path: Path = None
thing = path_url_or_repo # to save typing
print(f'>> Probing {thing} for import')
print(f">> Probing {thing} for import")
if thing.startswith(('http:','https:','ftp:')):
print(f' | {thing} appears to be a URL')
model_path = self._resolve_path(thing, 'models/ldm/stable-diffusion-v1') # _resolve_path does a download if needed
if thing.startswith(("http:", "https:", "ftp:")):
print(f" | {thing} appears to be a URL")
model_path = self._resolve_path(
thing, "models/ldm/stable-diffusion-v1"
) # _resolve_path does a download if needed
elif Path(thing).is_file() and thing.endswith(('.ckpt','.safetensors')):
if Path(thing).stem in ['model','diffusion_pytorch_model']:
print(f' | {Path(thing).name} appears to be part of a diffusers model. Skipping import')
elif Path(thing).is_file() and thing.endswith((".ckpt", ".safetensors")):
if Path(thing).stem in ["model", "diffusion_pytorch_model"]:
print(
f" | {Path(thing).name} appears to be part of a diffusers model. Skipping import"
)
return
else:
print(f' | {thing} appears to be a checkpoint file on disk')
model_path = self._resolve_path(thing, 'models/ldm/stable-diffusion-v1')
elif Path(thing).is_dir() and Path(thing, 'model_index.json').exists():
print(f' | {thing} appears to be a diffusers file on disk')
print(f" | {thing} appears to be a checkpoint file on disk")
model_path = self._resolve_path(thing, "models/ldm/stable-diffusion-v1")
elif Path(thing).is_dir() and Path(thing, "model_index.json").exists():
print(f" | {thing} appears to be a diffusers file on disk")
model_name = self.import_diffuser_model(
thing,
vae=dict(repo_id='stabilityai/sd-vae-ft-mse'),
vae=dict(repo_id="stabilityai/sd-vae-ft-mse"),
model_name=model_name,
description=description,
commit_to_conf=commit_to_conf
commit_to_conf=commit_to_conf,
)
elif Path(thing).is_dir():
if (Path(thing) / 'model_index.json').exists():
print(f'>> {thing} appears to be a diffusers model.')
model_name = self.import_diffuser_model(thing, commit_to_conf=commit_to_conf)
if (Path(thing) / "model_index.json").exists():
print(f">> {thing} appears to be a diffusers model.")
model_name = self.import_diffuser_model(
thing, commit_to_conf=commit_to_conf
)
else:
print(f'>> {thing} appears to be a directory. Will scan for models to import')
for m in list(Path(thing).rglob('*.ckpt')) + list(Path(thing).rglob('*.safetensors')):
if model_name := self.heuristic_import(str(m), convert, commit_to_conf=commit_to_conf):
print(f' >> {model_name} successfully imported')
print(
f">> {thing} appears to be a directory. Will scan for models to import"
)
for m in list(Path(thing).rglob("*.ckpt")) + list(
Path(thing).rglob("*.safetensors")
):
if model_name := self.heuristic_import(
str(m), convert, commit_to_conf=commit_to_conf
):
print(f" >> {model_name} successfully imported")
return model_name
elif re.match(r'^[\w.+-]+/[\w.+-]+$', thing):
print(f' | {thing} appears to be a HuggingFace diffusers repo_id')
model_name = self.import_diffuser_model(thing, commit_to_conf=commit_to_conf)
pipeline,_,_,_ = self._load_diffusers_model(self.config[model_name])
elif re.match(r"^[\w.+-]+/[\w.+-]+$", thing):
print(f" | {thing} appears to be a HuggingFace diffusers repo_id")
model_name = self.import_diffuser_model(
thing, commit_to_conf=commit_to_conf
)
pipeline, _, _, _ = self._load_diffusers_model(self.config[model_name])
else:
print(f"** {thing}: Unknown thing. Please provide a URL, file path, directory or HuggingFace repo_id")
print(
f"** {thing}: Unknown thing. Please provide a URL, file path, directory or HuggingFace repo_id"
)
# Model_path is set in the event of a legacy checkpoint file.
# If not set, we're all done
if not model_path:
return
if model_path.stem in self.config: #already imported
print(' | Already imported. Skipping')
if model_path.stem in self.config: # already imported
print(" | Already imported. Skipping")
return
# another round of heuristics to guess the correct config file.
checkpoint = safetensors.torch.load_file(model_path) if model_path.suffix == '.safetensors' else torch.load(model_path)
checkpoint = (
safetensors.torch.load_file(model_path)
if model_path.suffix == ".safetensors"
else torch.load(model_path)
)
model_type = self.probe_model_type(checkpoint)
model_config_file = None
if model_type == SDLegacyType.V1:
print(' | SD-v1 model detected')
model_config_file = Path(Globals.root,'configs/stable-diffusion/v1-inference.yaml')
print(" | SD-v1 model detected")
model_config_file = Path(
Globals.root, "configs/stable-diffusion/v1-inference.yaml"
)
elif model_type == SDLegacyType.V1_INPAINT:
print(' | SD-v1 inpainting model detected')
model_config_file = Path(Globals.root,'configs/stable-diffusion/v1-inpainting-inference.yaml')
print(" | SD-v1 inpainting model detected")
model_config_file = Path(
Globals.root, "configs/stable-diffusion/v1-inpainting-inference.yaml"
)
elif model_type == SDLegacyType.V2:
print(' | SD-v2 model detected; model will be converted to diffusers format')
model_config_file = Path(Globals.root,'configs/stable-diffusion/v2-inference-v.yaml')
print(
" | SD-v2 model detected; model will be converted to diffusers format"
)
model_config_file = Path(
Globals.root, "configs/stable-diffusion/v2-inference-v.yaml"
)
convert = True
else:
print(f'** {thing} is a legacy checkpoint file but not in a known Stable Diffusion model. Skipping import')
print(
f"** {thing} is a legacy checkpoint file but not in a known Stable Diffusion model. Skipping import"
)
return
if convert:
diffuser_path = Path(Globals.root, 'models',Globals.converted_ckpts_dir, model_path.stem)
diffuser_path = Path(
Globals.root, "models", Globals.converted_ckpts_dir, model_path.stem
)
model_name = self.convert_and_import(
model_path,
diffusers_path=diffuser_path,
vae=dict(repo_id='stabilityai/sd-vae-ft-mse'),
vae=dict(repo_id="stabilityai/sd-vae-ft-mse"),
model_name=model_name,
model_description=description,
original_config_file=model_config_file,
@ -864,7 +907,12 @@ class ModelManager(object):
config=model_config_file,
model_name=model_name,
model_description=description,
vae=str(Path(Globals.root,'models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt')),
vae=str(
Path(
Globals.root,
"models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt",
)
),
commit_to_conf=commit_to_conf,
)
if commit_to_conf:
@ -872,23 +920,25 @@ class ModelManager(object):
return model_name
def convert_and_import(
self,
ckpt_path: Path,
diffusers_path: Path,
model_name=None,
model_description=None,
vae=None,
original_config_file: Path = None,
commit_to_conf: Path = None,
self,
ckpt_path: Path,
diffusers_path: Path,
model_name=None,
model_description=None,
vae=None,
original_config_file: Path = None,
commit_to_conf: Path = None,
) -> dict:
"""
Convert a legacy ckpt weights file to diffuser model and import
into models.yaml.
"""
ckpt_path = self._resolve_path(ckpt_path, 'models/ldm/stable-diffusion-v1')
ckpt_path = self._resolve_path(ckpt_path, "models/ldm/stable-diffusion-v1")
if original_config_file:
original_config_file = self._resolve_path(original_config_file, 'configs/stable-diffusion')
original_config_file = self._resolve_path(
original_config_file, "configs/stable-diffusion"
)
new_config = None
from ldm.invoke.ckpt_to_diffuser import convert_ckpt_to_diffuser
@ -949,10 +999,11 @@ class ModelManager(object):
found_models = []
for file in files:
location = str(file.resolve()).replace("\\", "/")
if 'model.safetensors' not in location and 'diffusion_pytorch_model.safetensors' not in location:
found_models.append(
{"name": file.stem, "location": location}
)
if (
"model.safetensors" not in location
and "diffusion_pytorch_model.safetensors" not in location
):
found_models.append({"name": file.stem, "location": location})
return search_folder, found_models
@ -1112,7 +1163,7 @@ class ModelManager(object):
print("** Migration is done. Continuing...")
def _resolve_path(
self, source: Union[str, Path], dest_directory: str
self, source: Union[str, Path], dest_directory: str
) -> Optional[Path]:
resolved_path = None
if str(source).startswith(("http:", "https:", "ftp:")):