model installer downloads starter models + user-provided paths and repo_ids

- Ability to scan directory not yet implemented
- Can't download from Civitai due to incomplete URL download implementation
This commit is contained in:
Lincoln Stein 2023-02-16 00:34:15 -05:00
parent e87a2fe14b
commit 1bb07795d8
3 changed files with 341 additions and 180 deletions

View File

@ -4,6 +4,14 @@
# run this script from one with internet connectivity. The
# two machines must share a common .cache directory.
'''
This is the npyscreen frontend to the model installation application.
The work is actually done in a backend file named model_install_backend.py,
and is kicked off in the beforeEditing() method in a form with
the class name "outputForm". This decision allows the output from the
installation process to be captured and displayed in an attractive form.
'''
import argparse
import curses
import os
@ -14,22 +22,23 @@ from typing import List
import npyscreen
import torch
from pathlib import Path
from npyscreen import widget
from omegaconf import OmegaConf
from ..devices import choose_precision, choose_torch_device
from ..globals import Globals
from .widgets import MultiSelectColumns, TextBox
from .model_install_util import (Dataset_path, Default_config_file,
default_dataset, download_weight_datasets,
update_config_file, get_root
)
from .model_install_backend import (Dataset_path, default_config_file,
install_requested_models,
default_dataset, get_root
)
class addModelsForm(npyscreen.FormMultiPageAction):
def __init__(self, parentApp, name):
self.initial_models = OmegaConf.load(Dataset_path)
try:
self.existing_models = OmegaConf.load(Default_config_file)
self.existing_models = OmegaConf.load(default_config_file())
except:
self.existing_models = dict()
self.starter_model_list = [
@ -51,7 +60,7 @@ class addModelsForm(npyscreen.FormMultiPageAction):
x for x in list(self.initial_models.keys()) if x in self.existing_models
]
)
self.nextrely -= 1
self.add_widget_intelligent(
npyscreen.FixedText,
value='Use ctrl-N and ctrl-P to move to the <N>ext and <P>revious fields,',
@ -62,7 +71,7 @@ class addModelsForm(npyscreen.FormMultiPageAction):
value='cursor arrows to make a selection, and space to toggle checkboxes.',
editable=False,
)
self.nextrely += 1
if len(self.installed_models) > 0:
self.add_widget_intelligent(
npyscreen.TitleFixedText,
@ -83,10 +92,15 @@ class addModelsForm(npyscreen.FormMultiPageAction):
slow_scroll=True,
scroll_exit = True,
)
self.purge_deleted = self.add_widget_intelligent(
npyscreen.Checkbox,
name='Purge deleted models from disk',
value=False,
scroll_exit=True
)
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name="== UNINSTALLED STARTER MODELS ==",
name="== UNINSTALLED STARTER MODELS (recommended models selected) ==",
value="Select from a starter set of Stable Diffusion models from HuggingFace:",
begin_entry_at=2,
editable=False,
@ -99,42 +113,16 @@ class addModelsForm(npyscreen.FormMultiPageAction):
values=starter_model_labels,
value=[
self.starter_model_list.index(x)
for x in self.initial_models
for x in self.starter_model_list
if x in recommended_models
],
max_height=len(starter_model_labels) + 1,
relx = 4,
scroll_exit=True,
)
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name='== MODEL IMPORT DIRECTORY ==',
value='Import all models found in this directory (<tab> autocompletes):',
begin_entry_at=2,
editable=False,
color="CONTROL",
)
self.autoload_directory = self.add_widget_intelligent(
npyscreen.TitleFilename,
name='Directory:',
select_dir=True,
must_exist=True,
use_two_lines=False,
relx = 4,
labelColor='DANGER',
scroll_exit=True,
)
self.autoscan_on_startup = self.add_widget_intelligent(
npyscreen.Checkbox,
name='Scan this directory each time InvokeAI starts for new models to import.',
value=False,
relx = 4,
scroll_exit=True,
)
self.nextrely += 1
for line in [
'== INDIVIDUAL MODELS TO IMPORT ==',
'Enter list of URLs, paths models or HuggingFace diffusers repository IDs.',
'== IMPORT LOCAL AND REMOTE MODELS ==',
'Enter URLs, file paths, or HuggingFace diffusers repository IDs separated by spaces.',
'Use control-V or shift-control-V to paste:'
]:
self.add_widget_intelligent(
@ -151,7 +139,29 @@ class addModelsForm(npyscreen.FormMultiPageAction):
editable=True,
relx=4
)
self.nextrely += 2
self.nextrely += 1
self.show_directory_fields= self.add_widget_intelligent(
npyscreen.FormControlCheckbox,
name='Select a directory for models to import',
value=False,
)
self.autoload_directory = self.add_widget_intelligent(
npyscreen.TitleFilename,
name='Directory (<tab> autocompletes):',
select_dir=True,
must_exist=True,
use_two_lines=False,
labelColor='DANGER',
begin_entry_at=34,
scroll_exit=True,
)
self.autoscan_on_startup = self.add_widget_intelligent(
npyscreen.Checkbox,
name='Scan this directory each time InvokeAI starts for new models to import',
value=False,
relx = 4,
scroll_exit=True,
)
self.convert_models = self.add_widget_intelligent(
npyscreen.TitleSelectOne,
name='== CONVERT IMPORTED MODELS INTO DIFFUSERS==',
@ -160,15 +170,12 @@ class addModelsForm(npyscreen.FormMultiPageAction):
begin_entry_at=4,
scroll_exit=True,
)
for i in [self.autoload_directory,self.autoscan_on_startup]:
self.show_directory_fields.addVisibleWhenSelected(i)
def resize(self):
super().resize()
self.models_selected.values = self._get_starter_model_labels()
# thought this would dynamically resize the widget, but no luck
# self.previously_installed_models.columns = self._get_columns()
# self.previously_installed_models.max_height = 2+len(self.installed_models) // self._get_columns()
# self.previously_installed_models.make_contained_widgets()
# self.previously_installed_models.display()
def _get_starter_model_labels(self)->List[str]:
window_height, window_width = curses.initscr().getmaxyx()
@ -177,13 +184,13 @@ class addModelsForm(npyscreen.FormMultiPageAction):
spacing_width = 2
description_width = window_width - label_width - checkbox_width - spacing_width
im = self.initial_models
names = list(im.keys())
names = self.starter_model_list
descriptions = [im[x].description [0:description_width-3]+'...'
if len(im[x].description) > description_width
else im[x].description
for x in im]
for x in names]
return [
f"%-{label_width}s %s" % (names[x], descriptions[x]) for x in range(0,len(im))
f"%-{label_width}s %s" % (names[x], descriptions[x]) for x in range(0,len(names))
]
def _get_columns(self)->int:
@ -191,7 +198,7 @@ class addModelsForm(npyscreen.FormMultiPageAction):
return 4 if window_width > 240 else 3 if window_width>160 else 2 if window_width>80 else 1
def on_ok(self):
self.parentApp.setNextForm('MONITOR_OUTPUT')
self.parentApp.setNextForm(None)
self.editing = False
self.parentApp.user_cancelled = False
self.marshall_arguments()
@ -213,8 +220,7 @@ class addModelsForm(npyscreen.FormMultiPageAction):
.convert_to_diffusers: if True, convert legacy checkpoints into diffusers
'''
# starter models to install/remove
model_names = list(self.initial_models.keys())
starter_models = dict(map(lambda x: (model_names[x], True), self.models_selected.value))
starter_models = dict(map(lambda x: (self.starter_model_list[x], True), self.models_selected.value))
if hasattr(self,'previously_installed_models'):
unchecked = [
self.previously_installed_models.values[x]
@ -224,52 +230,100 @@ class addModelsForm(npyscreen.FormMultiPageAction):
starter_models.update(
map(lambda x: (x, False), unchecked)
)
self.parentApp.purge_deleted_models = self.purge_deleted.value
self.parentApp.starter_models=starter_models
# load directory and whether to scan on startup
self.parentApp.scan_directory = self.autoload_directory.value
self.parentApp.autoscan_on_startup = self.autoscan_on_startup.value
if self.show_directory_fields.value:
self.parentApp.scan_directory = self.autoload_directory.value
self.parentApp.autoscan_on_startup = self.autoscan_on_startup.value
else:
self.parentApp.scan_directory = None
self.parentApp.autoscan_on_startup = False
# URLs and the like
self.parentApp.import_model_paths = self.import_model_paths.value.split()
self.parentApp.convert_to_diffusers = self.convert_models.value != 0
class Log(object):
def __init__(self, writable):
self.writable = writable
# big chunk of dead code
# was intended to be a status area in which output of installation steps (including tqdm) was logged in real time
# Problem is that this requires a fork() and pipe, and not sure this will work properly on windows.
# So not using, but keep this here in case it is useful later
# class Log(object):
# def __init__(self, writable):
# self.writable = writable
def __enter__(self):
self._stdout = sys.stdout
sys.stdout = self.writable
return self
def __exit__(self, *args):
sys.stdout = self._stdout
# def __enter__(self):
# self._stdout = sys.stdout
# sys.stdout = self.writable
# return self
# def __exit__(self, *args):
# sys.stdout = self._stdout
class outputForm(npyscreen.ActionForm):
def create(self):
self.buffer = self.add_widget(
npyscreen.BufferPager,
editable=False,
)
# class outputForm(npyscreen.ActionForm):
# def create(self):
# self.done = False
# self.buffer = self.add_widget(
# npyscreen.BufferPager,
# editable=False,
# )
def write(self,string):
if string != '\n':
self.buffer.buffer([string])
# def write(self,string):
# if string != '\n':
# self.buffer.buffer([string])
def beforeEditing(self):
myapplication = self.parentApp
with Log(self):
print(f'DEBUG: these models will be removed: {[x for x in myapplication.starter_models if not myapplication.starter_models[x]]}')
print(f'DEBUG: these models will be installed: {[x for x in myapplication.starter_models if myapplication.starter_models[x]]}')
print(f'DEBUG: this directory will be scanned: {myapplication.scan_directory}')
print(f'DEBUG: scan at startup time? {myapplication.autoscan_on_startup}')
print(f'DEBUG: these things will be downloaded: {myapplication.import_model_paths}')
print(f'DEBUG: convert to diffusers? {myapplication.convert_to_diffusers}')
# def beforeEditing(self):
# if self.done:
# return
# installApp = self.parentApp
# with Log(self):
# models_to_remove = [x for x in installApp.starter_models if not installApp.starter_models[x]]
# models_to_install = [x for x in installApp.starter_models if installApp.starter_models[x]]
# directory_to_scan = installApp.scan_directory
# scan_at_startup = installApp.autoscan_on_startup
# potential_models_to_install = installApp.import_model_paths
# convert_to_diffusers = installApp.convert_to_diffusers
# print(f'these models will be removed: {models_to_remove}')
# print(f'these models will be installed: {models_to_install}')
# print(f'this directory will be scanned: {directory_to_scan}')
# print(f'these things will be downloaded: {potential_models_to_install}')
# print(f'scan at startup time? {scan_at_startup}')
# print(f'convert to diffusers? {convert_to_diffusers}')
# print(f'\nPress OK to proceed or Cancel.')
# def on_cancel(self):
# self.buffer.buffer(['goodbye!'])
# self.parentApp.setNextForm(None)
# self.editing = False
# def on_ok(self):
# if self.done:
# self.on_cancel()
# return
# installApp = self.parentApp
# with Log(self):
# models_to_remove = [x for x in installApp.starter_models if not installApp.starter_models[x]]
# models_to_install = [x for x in installApp.starter_models if installApp.starter_models[x]]
# directory_to_scan = installApp.scan_directory
# scan_at_startup = installApp.autoscan_on_startup
# potential_models_to_install = installApp.import_model_paths
# convert_to_diffusers = installApp.convert_to_diffusers
# install_requested_models(
# install_initial_models = models_to_install,
# remove_models = models_to_remove,
# scan_directory = Path(directory_to_scan) if directory_to_scan else None,
# external_models = potential_models_to_install,
# scan_at_startup = scan_at_startup,
# convert_to_diffusers = convert_to_diffusers,
# precision = 'float32' if installApp.opt.full_precision else choose_precision(torch.device(choose_torch_device())),
# config_file_path = Path(installApp.opt.config_file) if installApp.opt.config_file else None,
# )
# self.done = True
def on_ok(self):
self.buffer.buffer(['goodbye!'])
self.parentApp.setNextForm(None)
self.editing = False
class AddModelApplication(npyscreen.NPSAppManaged):
def __init__(self, saved_args=None):
@ -283,54 +337,43 @@ class AddModelApplication(npyscreen.NPSAppManaged):
addModelsForm,
name="Add/Remove Models",
)
self.output = self.addForm(
'MONITOR_OUTPUT',
outputForm,
name='Model Install Output'
)
# self.output = self.addForm(
# 'MONITOR_OUTPUT',
# outputForm,
# name='Model Install Output'
# )
# --------------------------------------------------------
def process_and_execute(app: npyscreen.NPSAppManaged):
models_to_remove = [x for x in app.starter_models if not app.starter_models[x]]
models_to_install = [x for x in app.starter_models if app.starter_models[x]]
directory_to_scan = app.scan_directory
scan_at_startup = app.autoscan_on_startup
potential_models_to_install = app.import_model_paths
convert_to_diffusers = app.convert_to_diffusers
install_requested_models(
install_initial_models = models_to_install,
remove_models = models_to_remove,
scan_directory = Path(directory_to_scan) if directory_to_scan else None,
external_models = potential_models_to_install,
scan_at_startup = scan_at_startup,
convert_to_diffusers = convert_to_diffusers,
precision = 'float32' if app.opt.full_precision else choose_precision(torch.device(choose_torch_device())),
purge_deleted = app.purge_deleted_models,
config_file_path = Path(app.opt.config_file) if app.opt.config_file else None,
)
# --------------------------------------------------------
def select_and_download_models(opt: Namespace):
if opt.default_only:
models_to_download = default_dataset()
install_requested_models(models_to_download)
else:
myapplication = AddModelApplication()
myapplication.run()
if not myapplication.user_cancelled:
print(f'DEBUG: these models will be removed: {[x for x in myapplication.starter_models if not myapplication.starter_models[x]]}')
print(f'DEBUG: these models will be installed: {[x for x in myapplication.starter_models if myapplication.starter_models[x]]}')
print(f'DEBUG: this directory will be scanned: {myapplication.scan_directory}')
print(f'DEBUG: scan at startup time? {myapplication.autoscan_on_startup}')
print(f'DEBUG: these things will be downloaded: {myapplication.import_model_paths}')
print(f'DEBUG: convert to diffusers? {myapplication.convert_to_diffusers}')
sys.exit(0)
if not models_to_download:
print(
'** No models were selected. To run this program again, select "Install initial models" from the invoke script.'
)
return
print("** Downloading and installing the selected models.")
precision = (
"float32"
if opt.full_precision
else choose_precision(torch.device(choose_torch_device()))
)
successfully_downloaded = download_weight_datasets(
models=models_to_download,
access_token=None,
precision=precision,
)
update_config_file(successfully_downloaded, opt)
if len(successfully_downloaded) < len(models_to_download):
print("** Some of the model downloads were not successful")
print(
"\nYour starting models were installed. To find and add more models, see https://invoke-ai.github.io/InvokeAI/installation/050_INSTALLING_MODELS"
)
installApp = AddModelApplication()
installApp.opt = opt
installApp.run()
process_and_execute(installApp)
# -------------------------------------
def main():
@ -392,7 +435,7 @@ def main():
'** Insufficient horizontal space for the interface. Please make your window wider and try again.'
)
else:
print(f"** A layout error has occurred: {str(e)}")
print(f"** An error has occurred: {str(e)}")
traceback.print_exc()
sys.exit(-1)

View File

@ -1,35 +1,29 @@
'''
"""
Utility (backend) functions used by model_install.py
'''
import argparse
"""
import os
import re
import shutil
import sys
import traceback
import warnings
from argparse import Namespace
from math import ceil
from pathlib import Path
from tempfile import TemporaryFile
import npyscreen
import requests
from diffusers import AutoencoderKL
from huggingface_hub import hf_hub_url
from omegaconf import OmegaConf
from omegaconf.dictconfig import DictConfig
from tqdm import tqdm
from typing import List
import invokeai.configs as configs
from ldm.invoke.devices import choose_precision, choose_torch_device
from ldm.invoke.generator.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ldm.invoke.globals import Globals, global_cache_dir, global_config_dir
from ldm.invoke.config.widgets import MultiSelectColumns
from ..generator.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ..globals import Globals, global_cache_dir, global_config_dir
from ..model_manager import ModelManager
warnings.filterwarnings("ignore")
import torch
# --------------------------globals-----------------------
Model_dir = "models"
Weights_dir = "ldm/stable-diffusion-v1/"
@ -37,12 +31,11 @@ Weights_dir = "ldm/stable-diffusion-v1/"
# the initial "configs" dir is now bundled in the `invokeai.configs` package
Dataset_path = Path(configs.__path__[0]) / "INITIAL_MODELS.yaml"
Default_config_file = Path(global_config_dir()) / "models.yaml"
SD_Configs = Path(global_config_dir()) / "stable-diffusion"
# initial models omegaconf
Datasets = None
Datasets = OmegaConf.load(Dataset_path)
Config_preamble = """# This file describes the alternative machine learning models
Config_preamble = """
# This file describes the alternative machine learning models
# available to InvokeAI script.
#
# To add a new model, follow the examples below. Each
@ -51,6 +44,60 @@ Config_preamble = """# This file describes the alternative machine learning mode
# was trained on.
"""
def default_config_file():
return Path(global_config_dir()) / "models.yaml"
def sd_configs():
return Path(global_config_dir()) / "stable-diffusion"
def initial_models():
global Datasets
if Datasets:
return Datasets
return (Datasets := OmegaConf.load(Dataset_path))
def install_requested_models(
install_initial_models: List[str] = None,
remove_models: List[str] = None,
scan_directory: Path = None,
external_models: List[str] = None,
scan_at_startup: bool = False,
convert_to_diffusers: bool = False,
precision: str = "float16",
purge_deleted: bool = False,
config_file_path: Path = None,
):
config_file_path =config_file_path or default_config_file()
model_manager = ModelManager(OmegaConf.load(config_file_path),precision=precision)
if remove_models and len(remove_models) > 0:
print("== DELETING UNCHECKED STARTER MODELS ==")
for model in remove_models:
print(f'{model}...')
model_manager.del_model(model, delete_files=purge_deleted)
model_manager.commit(config_file_path)
if install_initial_models and len(install_initial_models) > 0:
print("== INSTALLING SELECTED STARTER MODELS ==")
successfully_downloaded = download_weight_datasets(
models=install_initial_models,
access_token=None,
precision=precision,
) # for historical reasons, we don't use model manager here
update_config_file(successfully_downloaded, config_file_path)
if len(successfully_downloaded) < len(install_initial_models):
print("** Some of the model downloads were not successful")
if external_models and len(external_models)>0:
print("== INSTALLING EXTERNAL MODELS ==")
for path_url_or_repo in external_models:
model_manager.heuristic_import(
path_url_or_repo,
convert=convert_to_diffusers,
commit_to_conf=config_file_path
)
# -------------------------------------
def yes_or_no(prompt: str, default_yes=True):
default = "y" if default_yes else "n"
@ -60,6 +107,7 @@ def yes_or_no(prompt: str, default_yes=True):
else:
return response[0] in ("y", "Y")
# -------------------------------------
def get_root(root: str = None) -> str:
if root:
@ -69,11 +117,12 @@ def get_root(root: str = None) -> str:
else:
return Globals.root
# ---------------------------------------------
def recommended_datasets() -> dict:
datasets = dict()
for ds in Datasets.keys():
if Datasets[ds].get("recommended", False):
for ds in initial_models().keys():
if initial_models()[ds].get("recommended", False):
datasets[ds] = True
return datasets
@ -81,8 +130,8 @@ def recommended_datasets() -> dict:
# ---------------------------------------------
def default_dataset() -> dict:
datasets = dict()
for ds in Datasets.keys():
if Datasets[ds].get("default", False):
for ds in initial_models().keys():
if initial_models()[ds].get("default", False):
datasets[ds] = True
return datasets
@ -90,7 +139,7 @@ def default_dataset() -> dict:
# ---------------------------------------------
def all_datasets() -> dict:
datasets = dict()
for ds in Datasets.keys():
for ds in initial_models().keys():
datasets[ds] = True
return datasets
@ -102,26 +151,24 @@ def migrate_models_ckpt():
model_path = os.path.join(Globals.root, Model_dir, Weights_dir)
if not os.path.exists(os.path.join(model_path, "model.ckpt")):
return
new_name = Datasets["stable-diffusion-1.4"]["file"]
print('You seem to have the Stable Diffusion v4.1 "model.ckpt" already installed.')
rename = yes_or_no(f'Ok to rename it to "{new_name}" for future reference?')
if rename:
print(f"model.ckpt => {new_name}")
os.replace(
os.path.join(model_path, "model.ckpt"), os.path.join(model_path, new_name)
)
new_name = initial_models()["stable-diffusion-1.4"]["file"]
print('The Stable Diffusion v4.1 "model.ckpt" is already installed. The name will be changed to {new_name} to avoid confusion.')
print(f"model.ckpt => {new_name}")
os.replace(
os.path.join(model_path, "model.ckpt"), os.path.join(model_path, new_name)
)
# ---------------------------------------------
def download_weight_datasets(
models: dict, access_token: str, precision: str = "float32"
models: List[str], access_token: str, precision: str = "float32"
):
migrate_models_ckpt()
successful = dict()
for mod in models.keys():
for mod in models:
print(f"Downloading {mod}:")
successful[mod] = _download_repo_or_file(
Datasets[mod], access_token, precision=precision
initial_models()[mod], access_token, precision=precision
)
return successful
@ -255,21 +302,21 @@ def hf_download_with_resume(
# ---------------------------------------------
def update_config_file(successfully_downloaded: dict, opt: dict):
def update_config_file(successfully_downloaded: dict, config_file: Path):
config_file = (
Path(opt.config_file) if opt.config_file is not None else Default_config_file
Path(config_file) if config_file is not None else default_config_file()
)
# In some cases (incomplete setup, etc), the default configs directory might be missing.
# Create it if it doesn't exist.
# this check is ignored if opt.config_file is specified - user is assumed to know what they
# are doing if they are passing a custom config file from elsewhere.
if config_file is Default_config_file and not config_file.parent.exists():
if config_file is default_config_file() and not config_file.parent.exists():
configs_src = Dataset_path.parent
configs_dest = Default_config_file.parent
configs_dest = default_config_file().parent
shutil.copytree(configs_src, configs_dest, dirs_exist_ok=True)
yaml = new_config_file_contents(successfully_downloaded, config_file, opt)
yaml = new_config_file_contents(successfully_downloaded, config_file)
try:
backup = None
@ -306,7 +353,7 @@ def update_config_file(successfully_downloaded: dict, opt: dict):
# ---------------------------------------------
def new_config_file_contents(
successfully_downloaded: dict, config_file: Path, opt: dict
successfully_downloaded: dict, config_file: Path,
) -> str:
if config_file.exists():
conf = OmegaConf.load(str(config_file.expanduser().resolve()))
@ -319,10 +366,10 @@ def new_config_file_contents(
# version of the model was previously defined, and whether the current
# model is a diffusers (indicated with a path)
if conf.get(model) and Path(successfully_downloaded[model]).is_dir():
offer_to_delete_weights(model, conf[model], opt.yes_to_all)
delete_weights(model, conf[model])
stanza = {}
mod = Datasets[model]
mod = initial_models()[model]
stanza["description"] = mod["description"]
stanza["repo_id"] = mod["repo_id"]
stanza["format"] = mod["format"]
@ -336,7 +383,7 @@ def new_config_file_contents(
stanza["weights"] = os.path.relpath(
successfully_downloaded[model], start=Globals.root
)
stanza["config"] = os.path.normpath(os.path.join(SD_Configs, mod["config"]))
stanza["config"] = os.path.normpath(os.path.join(sd_configs(), mod["config"]))
if "vae" in mod:
if "file" in mod["vae"]:
stanza["vae"] = os.path.normpath(
@ -359,20 +406,20 @@ def new_config_file_contents(
# ---------------------------------------------
def offer_to_delete_weights(model_name: str, conf_stanza: dict, yes_to_all: bool):
def delete_weights(model_name: str, conf_stanza: dict):
if not (weights := conf_stanza.get("weights")):
return
if re.match("/VAE/", conf_stanza.get("config")):
return
if yes_to_all or yes_or_no(
f"\n** The checkpoint version of {model_name} is superseded by the diffusers version. Delete the original file {weights}?",
default_yes=False,
):
weights = Path(weights)
if not weights.is_absolute():
weights = Path(Globals.root) / weights
print(
f"\n** The checkpoint version of {model_name} is superseded by the diffusers version. Deleting the original file {weights}?"
)
weights = Path(weights)
if not weights.is_absolute():
weights = Path(Globals.root) / weights
try:
weights.unlink()
except OSError as e:
print(str(e))

View File

@ -11,6 +11,7 @@ import gc
import hashlib
import io
import os
import re
import sys
import textwrap
import time
@ -699,7 +700,78 @@ class ModelManager(object):
self.commit(commit_to_conf)
return True
def autoconvert_weights(
def heuristic_import(
self,
path_url_or_repo: str,
convert: bool= False,
commit_to_conf: Path=None,
):
model_path = None
thing = path_url_or_repo # to save typing
if thing.startswith(('http:','https:','ftp:')):
print(f'* {thing} appears to be a URL')
model_path = self._resolve_path(thing, 'models/ldm/stable-diffusion-v1') # _resolve_path does a download if needed
elif Path(thing).is_file() and thing.endswith(('.ckpt','.safetensors')):
print(f'* {thing} appears to be a checkpoint file on disk')
model_path = self._resolve_path(thing, 'models/ldm/stable-diffusion-v1')
elif Path(thing).is_dir() and Path(thing, 'model_index.json').exists():
print(f'* {thing} appears to be a diffusers file on disk')
self.import_diffusers_model(thing, commit_to_conf=commit_to_conf)
elif Path(thing).is_dir():
print(f'* {thing} appears to be a directory. Will scan for models to import')
for m in list(Path(thing).rglob('*.ckpt')) + list(Path(thing).rglob('*.safetensors')):
self.heuristic_import(m, convert, commit_to_conf=commit_to_conf)
return
elif re.match(r'^[\w.+-]+/[\w.+-]+$', thing):
print(f'* {thing} appears to be a HuggingFace diffusers repo_id')
self.import_diffuser_model(thing, commit_to_conf=commit_to_conf)
else:
print(f"* {thing}: Unknown thing. Please provide a URL, file path, directory or HuggingFace repo_id")
# Model_path is set in the event of a legacy checkpoint file.
# If not set, we're all done
if not model_path:
return
# another round of heuristics to guess the correct config file.
model_config_file = Path(Globals.root,'configs/stable-diffusion/v1-inpainting-inference.yaml')
checkpoint = safetensors.torch.load_file(model_path) if model_path.suffix == '.safetensors' else torch.load(model_path)
key_name = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
if key_name in checkpoint and checkpoint[key_name].shape[-1] == 1024:
print(f'* {thing} appears to be an SD-v2 model; model will be converted to diffusers format')
model_config_file = Path(Globals.root,'configs/stable-diffusion/v2-inference-v.yaml')
convert = True
elif re.search('inpaint', model_path, flags=re.IGNORECASE):
print(f'* {thing} appears to be an SD-v1 inpainting model')
model_config_file = Path(Globals.root,'configs/stable-diffusion/v1-inpainting-inference.yaml')
else:
print(f'* {thing} appears to be an SD-v1 model')
if convert:
diffuser_path = Path(Globals.root, 'models',Globals.converted_ckpts_dir, model_path.stem)
self.convert_and_import(
model_path,
diffusers_path=diffuser_path,
vae=dict(repo_id='stabilityai/sd-vae-ft-mse'),
original_config_file=model_config_file,
commit_to_conf=commit_to_conf,
)
else:
self.import_ckpt_model(
model_path,
config=model_config_file,
vae=Path(Globals.root,'models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt'),
commit_to_conf=commit_to_conf,
)
def autoconvert_weights (
self,
conf_path: Path,
weights_directory: Path = None,
@ -750,7 +822,6 @@ class ModelManager(object):
into models.yaml.
"""
new_config = None
import transformers
from ldm.invoke.ckpt_to_diffuser import convert_ckpt_to_diffuser