import os import re import shlex import sys import traceback from argparse import Namespace from pathlib import Path from typing import List, Optional, Union import click from compel import PromptParser if sys.platform == "darwin": os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" import pyparsing # type: ignore import ldm.invoke from ..generate import Generate from .args import (Args, dream_cmd_from_png, metadata_dumps, metadata_from_png) from .generator.diffusers_pipeline import PipelineIntermediateState from .globals import Globals from .image_util import make_grid from .log import write_log from .model_manager import ModelManager from .pngwriter import PngWriter, retrieve_metadata, write_metadata from .readline import Completer, get_completer from ..util import url_attachment_name # global used in multiple functions (fix) infile = None def main(): """Initialize command-line parsers and the diffusion model""" global infile opt = Args() args = opt.parse_args() if not args: sys.exit(-1) if args.laion400m: print( "--laion400m flag has been deprecated. Please use --model laion400m instead." ) sys.exit(-1) if args.weights: print( "--weights argument has been deprecated. Please edit ./configs/models.yaml, and select the weights using --model instead." ) sys.exit(-1) if args.max_loaded_models is not None: if args.max_loaded_models <= 0: print("--max_loaded_models must be >= 1; using 1") args.max_loaded_models = 1 # alert - setting a few globals here Globals.try_patchmatch = args.patchmatch Globals.always_use_cpu = args.always_use_cpu Globals.internet_available = args.internet_available and check_internet() Globals.disable_xformers = not args.xformers Globals.sequential_guidance = args.sequential_guidance Globals.ckpt_convert = args.ckpt_convert print(f">> Internet connectivity is {Globals.internet_available}") if not args.conf: config_file = os.path.join(Globals.root, "configs", "models.yaml") if not os.path.exists(config_file): report_model_error( opt, FileNotFoundError(f"The file {config_file} could not be found.") ) print(f">> {ldm.invoke.__app_name__}, version {ldm.invoke.__version__}") print(f'>> InvokeAI runtime directory is "{Globals.root}"') # loading here to avoid long delays on startup # these two lines prevent a horrible warning message from appearing # when the frozen CLIP tokenizer is imported import transformers # type: ignore from ldm.generate import Generate transformers.logging.set_verbosity_error() import diffusers diffusers.logging.set_verbosity_error() # Loading Face Restoration and ESRGAN Modules gfpgan, codeformer, esrgan = load_face_restoration(opt) # normalize the config directory relative to root if not os.path.isabs(opt.conf): opt.conf = os.path.normpath(os.path.join(Globals.root, opt.conf)) if opt.embeddings: if not os.path.isabs(opt.embedding_path): embedding_path = os.path.normpath( os.path.join(Globals.root, opt.embedding_path) ) else: embedding_path = opt.embedding_path else: embedding_path = None # migrate legacy models ModelManager.migrate_models() # load the infile as a list of lines if opt.infile: try: if os.path.isfile(opt.infile): infile = open(opt.infile, "r", encoding="utf-8") elif opt.infile == "-": # stdin infile = sys.stdin else: raise FileNotFoundError(f"{opt.infile} not found.") except (FileNotFoundError, IOError) as e: print(f"{e}. Aborting.") sys.exit(-1) # creating a Generate object: try: gen = Generate( conf=opt.conf, model=opt.model, sampler_name=opt.sampler_name, embedding_path=embedding_path, full_precision=opt.full_precision, precision=opt.precision, gfpgan=gfpgan, codeformer=codeformer, esrgan=esrgan, free_gpu_mem=opt.free_gpu_mem, safety_checker=opt.safety_checker, max_loaded_models=opt.max_loaded_models, ) except (FileNotFoundError, TypeError, AssertionError) as e: report_model_error(opt, e) except (IOError, KeyError) as e: print(f"{e}. Aborting.") sys.exit(-1) if opt.seamless: print(">> changed to seamless tiling mode") # preload the model try: gen.load_model() except KeyError: pass except Exception as e: report_model_error(opt, e) # try to autoconvert new models # autoimport new .ckpt files if path := opt.autoconvert: gen.model_manager.autoconvert_weights( conf_path=opt.conf, weights_directory=path, ) # web server loops forever if opt.web or opt.gui: invoke_ai_web_server_loop(gen, gfpgan, codeformer, esrgan) sys.exit(0) if not infile: print( "\n* Initialization done! Awaiting your command (-h for help, 'q' to quit)" ) try: main_loop(gen, opt) except KeyboardInterrupt: print( f'\nGoodbye!\nYou can start InvokeAI again by running the "invoke.bat" (or "invoke.sh") script from {Globals.root}' ) except Exception: print(">> An error occurred:") traceback.print_exc() # TODO: main_loop() has gotten busy. Needs to be refactored. def main_loop(gen, opt): """prompt/read/execute loop""" global infile done = False doneAfterInFile = infile is not None path_filter = re.compile(r'[<>:"/\\|?*]') last_results = list() # The readline completer reads history from the .dream_history file located in the # output directory specified at the time of script launch. We do not currently support # changing the history file midstream when the output directory is changed. completer = get_completer(opt, models=gen.model_manager.list_models()) set_default_output_dir(opt, completer) if gen.model: add_embedding_terms(gen, completer) output_cntr = completer.get_current_history_length() + 1 # os.pathconf is not available on Windows if hasattr(os, "pathconf"): path_max = os.pathconf(opt.outdir, "PC_PATH_MAX") name_max = os.pathconf(opt.outdir, "PC_NAME_MAX") else: path_max = 260 name_max = 255 while not done: operation = "generate" try: command = get_next_command(infile, gen.model_name) except EOFError: done = infile is None or doneAfterInFile infile = None continue # skip empty lines if not command.strip(): continue if command.startswith(("#", "//")): continue if len(command.strip()) == 1 and command.startswith("q"): done = True break if not command.startswith("!history"): completer.add_history(command) if command.startswith("!"): command, operation = do_command(command, gen, opt, completer) if operation is None: continue if opt.parse_cmd(command) is None: continue if opt.init_img: try: if not opt.prompt: oldargs = metadata_from_png(opt.init_img) opt.prompt = oldargs.prompt print(f'>> Retrieved old prompt "{opt.prompt}" from {opt.init_img}') except (OSError, AttributeError, KeyError): pass if len(opt.prompt) == 0: opt.prompt = "" # width and height are set by model if not specified if not opt.width: opt.width = gen.width if not opt.height: opt.height = gen.height # retrieve previous value of init image if requested if opt.init_img is not None and re.match("^-\\d+$", opt.init_img): try: opt.init_img = last_results[int(opt.init_img)][0] print(f">> Reusing previous image {opt.init_img}") except IndexError: print(f">> No previous initial image at position {opt.init_img} found") opt.init_img = None continue # the outdir can change with each command, so we adjust it here set_default_output_dir(opt, completer) # try to relativize pathnames for attr in ("init_img", "init_mask", "init_color"): if getattr(opt, attr) and not os.path.exists(getattr(opt, attr)): basename = getattr(opt, attr) path = os.path.join(opt.outdir, basename) setattr(opt, attr, path) # retrieve previous value of seed if requested # Exception: for postprocess operations negative seed values # mean "discard the original seed and generate a new one" # (this is a non-obvious hack and needs to be reworked) if opt.seed is not None and opt.seed < 0 and operation != "postprocess": try: opt.seed = last_results[opt.seed][1] print(f">> Reusing previous seed {opt.seed}") except IndexError: print(f">> No previous seed at position {opt.seed} found") opt.seed = None continue if opt.strength is None: opt.strength = 0.75 if opt.out_direction is None else 0.83 if opt.with_variations is not None: opt.with_variations = split_variations(opt.with_variations) if opt.prompt_as_dir and operation == "generate": # sanitize the prompt to a valid folder name subdir = path_filter.sub("_", opt.prompt)[:name_max].rstrip(" .") # truncate path to maximum allowed length # 39 is the length of '######.##########.##########-##.png', plus two separators and a NUL subdir = subdir[: (path_max - 39 - len(os.path.abspath(opt.outdir)))] current_outdir = os.path.join(opt.outdir, subdir) print('Writing files to directory: "' + current_outdir + '"') # make sure the output directory exists if not os.path.exists(current_outdir): os.makedirs(current_outdir) else: if not os.path.exists(opt.outdir): os.makedirs(opt.outdir) current_outdir = opt.outdir # Here is where the images are actually generated! last_results = [] try: file_writer = PngWriter(current_outdir) results = [] # list of filename, prompt pairs grid_images = dict() # seed -> Image, only used if `opt.grid` prior_variations = opt.with_variations or [] prefix = file_writer.unique_prefix() step_callback = ( make_step_callback(gen, opt, prefix) if opt.save_intermediates > 0 else None ) def image_writer( image, seed, upscaled=False, first_seed=None, use_prefix=None, prompt_in=None, attention_maps_image=None, ): # note the seed is the seed of the current image # the first_seed is the original seed that noise is added to # when the -v switch is used to generate variations nonlocal prior_variations nonlocal prefix path = None if opt.grid: grid_images[seed] = image elif operation == "mask": filename = f"{prefix}.{use_prefix}.{seed}.png" tm = opt.text_mask[0] th = opt.text_mask[1] if len(opt.text_mask) > 1 else 0.5 formatted_dream_prompt = ( f"!mask {opt.input_file_path} -tm {tm} {th}" ) path = file_writer.save_image_and_prompt_to_png( image=image, dream_prompt=formatted_dream_prompt, metadata={}, name=filename, compress_level=opt.png_compression, ) results.append([path, formatted_dream_prompt]) else: if use_prefix is not None: prefix = use_prefix postprocessed = upscaled if upscaled else operation == "postprocess" opt.prompt = ( gen.huggingface_concepts_library.replace_triggers_with_concepts( opt.prompt or prompt_in ) ) # to avoid the problem of non-unique concept triggers filename, formatted_dream_prompt = prepare_image_metadata( opt, prefix, seed, operation, prior_variations, postprocessed, first_seed, ) path = file_writer.save_image_and_prompt_to_png( image=image, dream_prompt=formatted_dream_prompt, metadata=metadata_dumps( opt, seeds=[ seed if opt.variation_amount == 0 and len(prior_variations) == 0 else first_seed ], model_hash=gen.model_hash, ), name=filename, compress_level=opt.png_compression, ) # update rfc metadata if operation == "postprocess": tool = re.match( "postprocess:(\w+)", opt.last_operation ).groups()[0] add_postprocessing_to_metadata( opt, opt.input_file_path, filename, tool, formatted_dream_prompt, ) if (not postprocessed) or opt.save_original: # only append to results if we didn't overwrite an earlier output results.append([path, formatted_dream_prompt]) # so that the seed autocompletes (on linux|mac when -S or --seed specified if completer and operation == "generate": completer.add_seed(seed) completer.add_seed(first_seed) last_results.append([path, seed]) if operation == "generate": catch_ctrl_c = ( infile is None ) # if running interactively, we catch keyboard interrupts opt.last_operation = "generate" try: gen.prompt2image( image_callback=image_writer, step_callback=step_callback, catch_interrupts=catch_ctrl_c, **vars(opt), ) except (PromptParser.ParsingException, pyparsing.ParseException) as e: print("** An error occurred while processing your prompt **") print(f"** {str(e)} **") elif operation == "postprocess": print(f">> fixing {opt.prompt}") opt.last_operation = do_postprocess(gen, opt, image_writer) elif operation == "mask": print(f">> generating masks from {opt.prompt}") do_textmask(gen, opt, image_writer) if opt.grid and len(grid_images) > 0: grid_img = make_grid(list(grid_images.values())) grid_seeds = list(grid_images.keys()) first_seed = last_results[0][1] filename = f"{prefix}.{first_seed}.png" formatted_dream_prompt = opt.dream_prompt_str( seed=first_seed, grid=True, iterations=len(grid_images) ) formatted_dream_prompt += f" # {grid_seeds}" metadata = metadata_dumps( opt, seeds=grid_seeds, model_hash=gen.model_hash ) path = file_writer.save_image_and_prompt_to_png( image=grid_img, dream_prompt=formatted_dream_prompt, metadata=metadata, name=filename, ) results = [[path, formatted_dream_prompt]] except AssertionError as e: print(e) continue except OSError as e: print(e) continue print("Outputs:") log_path = os.path.join(current_outdir, "invoke_log") output_cntr = write_log(results, log_path, ("txt", "md"), output_cntr) print() print( f'\nGoodbye!\nYou can start InvokeAI again by running the "invoke.bat" (or "invoke.sh") script from {Globals.root}' ) # TO DO: remove repetitive code and the awkward command.replace() trope # Just do a simple parse of the command! def do_command(command: str, gen, opt: Args, completer) -> tuple: global infile operation = "generate" # default operation, alternative is 'postprocess' if command.startswith( "!dream" ): # in case a stored prompt still contains the !dream command command = command.replace("!dream ", "", 1) elif command.startswith("!fix"): command = command.replace("!fix ", "", 1) operation = "postprocess" elif command.startswith("!mask"): command = command.replace("!mask ", "", 1) operation = "mask" elif command.startswith("!switch"): model_name = command.replace("!switch ", "", 1) try: gen.set_model(model_name) add_embedding_terms(gen, completer) except KeyError as e: print(str(e)) except Exception as e: report_model_error(opt, e) completer.add_history(command) operation = None elif command.startswith("!models"): gen.model_manager.print_models() completer.add_history(command) operation = None elif command.startswith("!import"): path = shlex.split(command) if len(path) < 2: print( "** please provide (1) a URL to a .ckpt file to import; (2) a local path to a .ckpt file; or (3) a diffusers repository id in the form stabilityai/stable-diffusion-2-1" ) else: import_model(path[1], gen, opt, completer) completer.add_history(command) operation = None elif command.startswith("!convert"): path = shlex.split(command) if len(path) < 2: print("** please provide the path to a .ckpt or .safetensors model") elif not os.path.exists(path[1]): print(f"** {path[1]}: model not found") else: optimize_model(path[1], gen, opt, completer) completer.add_history(command) operation = None elif command.startswith("!optimize"): path = shlex.split(command) if len(path) < 2: print("** please provide an installed model name") elif not path[1] in gen.model_manager.list_models(): print(f"** {path[1]}: model not found") else: optimize_model(path[1], gen, opt, completer) completer.add_history(command) operation = None elif command.startswith("!edit"): path = shlex.split(command) if len(path) < 2: print("** please provide the name of a model") else: edit_model(path[1], gen, opt, completer) completer.add_history(command) operation = None elif command.startswith("!del"): path = shlex.split(command) if len(path) < 2: print("** please provide the name of a model") else: del_config(path[1], gen, opt, completer) completer.add_history(command) operation = None elif command.startswith("!fetch"): file_path = command.replace("!fetch", "", 1).strip() retrieve_dream_command(opt, file_path, completer) completer.add_history(command) operation = None elif command.startswith("!replay"): file_path = command.replace("!replay", "", 1).strip() if infile is None and os.path.isfile(file_path): infile = open(file_path, "r", encoding="utf-8") completer.add_history(command) operation = None elif command.startswith("!trigger"): print("Embedding trigger strings: ", ", ".join(gen.embedding_trigger_strings)) operation = None elif command.startswith("!history"): completer.show_history() operation = None elif command.startswith("!search"): search_str = command.replace("!search", "", 1).strip() completer.show_history(search_str) operation = None elif command.startswith("!clear"): completer.clear_history() operation = None elif re.match("^!(\d+)", command): command_no = re.match("^!(\d+)", command).groups()[0] command = completer.get_line(int(command_no)) completer.set_line(command) operation = None else: # not a recognized command, so give the --help text command = "-h" return command, operation def set_default_output_dir(opt: Args, completer: Completer): """ If opt.outdir is relative, we add the root directory to it normalize the outdir relative to root and make sure it exists. """ if not os.path.isabs(opt.outdir): opt.outdir = os.path.normpath(os.path.join(Globals.root, opt.outdir)) if not os.path.exists(opt.outdir): os.makedirs(opt.outdir) completer.set_default_dir(opt.outdir) def import_model(model_path: str, gen, opt, completer): """ model_path can be (1) a URL to a .ckpt file; (2) a local .ckpt file path; (3) a huggingface repository id; or (4) a local directory containing a diffusers model. """ model_path = model_path.replace('\\','/') # windows model_name = None if model_path.startswith(("http:", "https:", "ftp:")): model_name = import_ckpt_model(model_path, gen, opt, completer) elif ( os.path.exists(model_path) and model_path.endswith((".ckpt", ".safetensors")) and os.path.isfile(model_path) ): model_name = import_ckpt_model(model_path, gen, opt, completer) elif os.path.isdir(model_path): # Allow for a directory containing multiple models. models = list(Path(model_path).rglob("*.ckpt")) + list( Path(model_path).rglob("*.safetensors") ) if models: # Only the last model name will be used below. for model in sorted(models): if click.confirm(f"Import {model.stem} ?", default=True): model_name = import_ckpt_model(model, gen, opt, completer) print() else: model_name = import_diffuser_model(Path(model_path), gen, opt, completer) elif re.match(r"^[\w.+-]+/[\w.+-]+$", model_path): model_name = import_diffuser_model(model_path, gen, opt, completer) else: print( f"** {model_path} is neither the path to a .ckpt file nor a diffusers repository id. Can't import." ) if not model_name: return if not _verify_load(model_name, gen): print("** model failed to load. Discarding configuration entry") gen.model_manager.del_model(model_name) return if click.confirm('Make this the default model?', default=False): gen.model_manager.set_default_model(model_name) gen.model_manager.commit(opt.conf) completer.update_models(gen.model_manager.list_models()) print(f">> {model_name} successfully installed") def import_checkpoint_list(models: List[Path], gen, opt, completer)->List[str]: ''' Does a mass import of all the checkpoint/safetensors on a path list ''' model_names = list() choice = input('** Directory of checkpoint/safetensors models detected. Install ll or elected models? [a] ') or 'a' do_all = choice.startswith('a') if do_all: config_file = _ask_for_config_file(models[0], completer, plural=True) manager = gen.model_manager for model in sorted(models): model_name = f'{model.stem}' model_description = f'Imported model {model_name}' if model_name in manager.model_names(): print(f'** {model_name} is already imported. Skipping.') elif manager.import_ckpt_model( model, config = config_file, model_name = model_name, model_description = model_description, commit_to_conf = opt.conf): model_names.append(model_name) print(f'>> Model {model_name} imported successfully') else: print(f'** Model {model} failed to import') else: for model in sorted(models): if click.confirm(f'Import {model.stem} ?', default=True): if model_name := import_ckpt_model(model, gen, opt, completer): print(f'>> Model {model.stem} imported successfully') model_names.append(model_name) else: print(f'** Model {model} failed to import') print() return model_names def import_diffuser_model( path_or_repo: Union[Path, str], gen, _, completer ) -> Optional[str]: path_or_repo = path_or_repo.replace('\\','/') # windows manager = gen.model_manager default_name = Path(path_or_repo).stem default_description = f"Imported model {default_name}" model_name, model_description = _get_model_name_and_desc( manager, completer, model_name=default_name, model_description=default_description, ) vae = None if click.confirm('Replace this model\'s VAE with "stabilityai/sd-vae-ft-mse"?', default=False): vae = dict(repo_id='stabilityai/sd-vae-ft-mse') if not manager.import_diffuser_model( path_or_repo, model_name=model_name, vae=vae, description=model_description ): print("** model failed to import") return None return model_name def import_ckpt_model( path_or_url: Union[Path, str], gen, opt, completer ) -> Optional[str]: path_or_url = path_or_url.replace('\\','/') manager = gen.model_manager is_a_url = str(path_or_url).startswith(('http:','https:')) base_name = Path(url_attachment_name(path_or_url)).name if is_a_url else Path(path_or_url).name default_name = Path(base_name).stem default_description = f"Imported model {default_name}" model_name, model_description = _get_model_name_and_desc( manager, completer, model_name=default_name, model_description=default_description, ) config_file = None default = ( Path(Globals.root, "configs/stable-diffusion/v1-inpainting-inference.yaml") if re.search("inpaint", default_name, flags=re.IGNORECASE) else Path(Globals.root, "configs/stable-diffusion/v1-inference.yaml") ) completer.complete_extensions((".yaml", ".yml")) completer.set_line(str(default)) done = False while not done: config_file = input("Configuration file for this model: ").strip() done = os.path.exists(config_file) completer.complete_extensions((".ckpt", ".safetensors")) vae = None default = Path( Globals.root, "models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt" ) completer.set_line(str(default)) done = False while not done: vae = input("VAE file for this model (leave blank for none): ").strip() or None done = (not vae) or os.path.exists(vae) completer.complete_extensions(None) if not manager.import_ckpt_model( path_or_url, config=config_file, vae=vae, model_name=model_name, model_description=model_description, commit_to_conf=opt.conf, ): print("** model failed to import") return None return model_name def _verify_load(model_name: str, gen) -> bool: print(">> Verifying that new model loads...") current_model = gen.model_name try: if not gen.model_manager.get_model(model_name): return False except Exception as e: print(f'** model failed to load: {str(e)}') print('** note that importing 2.X checkpoints is not supported. Please use !convert_model instead.') return False if click.confirm('Keep model loaded?', default=True): gen.set_model(model_name) else: print(">> Restoring previous model") gen.set_model(current_model) return True def _get_model_name_and_desc( model_manager, completer, model_name: str = "", model_description: str = "" ): model_name = _get_model_name(model_manager.list_models(), completer, model_name) completer.set_line(model_description) model_description = ( input(f"Description for this model [{model_description}]: ").strip() or model_description ) return model_name, model_description def _ask_for_config_file(model_path: Union[str,Path], completer, plural: bool=False)->Path: default = '1' if re.search('inpaint',str(model_path),flags=re.IGNORECASE): default = '3' choices={ '1': 'v1-inference.yaml', '2': 'v2-inference-v.yaml', '3': 'v1-inpainting-inference.yaml', } prompt = '''What type of models are these?: [1] Models based on Stable Diffusion 1.X [2] Models based on Stable Diffusion 2.X [3] Inpainting models based on Stable Diffusion 1.X [4] Something else''' if plural else '''What type of model is this?: [1] A model based on Stable Diffusion 1.X [2] A model based on Stable Diffusion 2.X [3] An inpainting models based on Stable Diffusion 1.X [4] Something else''' print(prompt) choice = input(f'Your choice: [{default}] ') choice = choice.strip() or default if config_file := choices.get(choice,None): return Path('configs','stable-diffusion',config_file) # otherwise ask user to select done = False completer.complete_extensions(('.yaml','.yml')) completer.set_line(str(Path(Globals.root,'configs/stable-diffusion/'))) while not done: config_path = input('Configuration file for this model (leave blank to abort): ').strip() done = not config_path or os.path.exists(config_path) return config_path def optimize_model(model_name_or_path: Union[Path,str], gen, opt, completer): model_name_or_path = model_name_or_path.replace('\\','/') # windows manager = gen.model_manager ckpt_path = None original_config_file=None if model_name_or_path == gen.model_name: print("** Can't convert the active model. !switch to another model first. **") return elif model_info := manager.model_info(model_name_or_path): if "weights" in model_info: ckpt_path = Path(model_info["weights"]) original_config_file = Path(model_info["config"]) model_name = model_name_or_path model_description = model_info["description"] else: print(f"** {model_name_or_path} is not a legacy .ckpt weights file") return elif os.path.exists(model_name_or_path): original_config_file = original_config_file or _ask_for_config_file(model_name_or_path, completer) if not original_config_file: return ckpt_path = Path(model_name_or_path) model_name, model_description = _get_model_name_and_desc( manager, completer, ckpt_path.stem, f"Converted model {ckpt_path.stem}" ) else: print( f"** {model_name_or_path} is neither an existing model nor the path to a .ckpt file" ) return if not ckpt_path.is_absolute(): ckpt_path = Path(Globals.root, ckpt_path) if original_config_file and not original_config_file.is_absolute(): original_config_file = Path(Globals.root, original_config_file) diffuser_path = Path( Globals.root, "models", Globals.converted_ckpts_dir, model_name ) if diffuser_path.exists(): print( f"** {model_name_or_path} is already optimized. Will not overwrite. If this is an error, please remove the directory {diffuser_path} and try again." ) return vae = None if click.confirm('Replace this model\'s VAE with "stabilityai/sd-vae-ft-mse"?', default=False): vae = dict(repo_id='stabilityai/sd-vae-ft-mse') new_config = gen.model_manager.convert_and_import( ckpt_path, diffuser_path, model_name=model_name, model_description=model_description, vae=vae, original_config_file=original_config_file, commit_to_conf=opt.conf, ) if not new_config: return completer.update_models(gen.model_manager.list_models()) if click.confirm(f'Load optimized model {model_name}?', default=True): gen.set_model(model_name) if click.confirm(f'Delete the original .ckpt file at {ckpt_path}?',default=False): ckpt_path.unlink(missing_ok=True) print(f"{ckpt_path} deleted") def del_config(model_name: str, gen, opt, completer): current_model = gen.model_name if model_name == current_model: print("** Can't delete active model. !switch to another model first. **") return if model_name not in gen.model_manager.config: print(f"** Unknown model {model_name}") return if not click.confirm(f'Remove {model_name} from the list of models known to InvokeAI?',default=True): return delete_completely = click.confirm('Completely remove the model file or directory from disk?',default=False) gen.model_manager.del_model(model_name,delete_files=delete_completely) gen.model_manager.commit(opt.conf) print(f"** {model_name} deleted") completer.update_models(gen.model_manager.list_models()) def edit_model(model_name: str, gen, opt, completer): manager = gen.model_manager if not (info := manager.model_info(model_name)): print(f"** Unknown model {model_name}") return print(f"\n>> Editing model {model_name} from configuration file {opt.conf}") new_name = _get_model_name(manager.list_models(), completer, model_name) for attribute in info.keys(): if type(info[attribute]) != str: continue if attribute == "format": continue completer.set_line(info[attribute]) info[attribute] = input(f"{attribute}: ") or info[attribute] if new_name != model_name: manager.del_model(model_name) # this does the update manager.add_model(new_name, info, True) if click.confirm('Make this the default model?',default=False): manager.set_default_model(new_name) manager.commit(opt.conf) completer.update_models(manager.list_models()) print(">> Model successfully updated") def _get_model_name(existing_names, completer, default_name: str = "") -> str: done = False completer.set_line(default_name) while not done: model_name = input(f"Short name for this model [{default_name}]: ").strip() if len(model_name) == 0: model_name = default_name if not re.match("^[\w._+:/-]+$", model_name): print( '** model name must contain only words, digits and the characters "._+:/-" **' ) elif model_name != default_name and model_name in existing_names: print(f"** the name {model_name} is already in use. Pick another.") else: done = True return model_name def do_textmask(gen, opt, callback): image_path = opt.prompt if not os.path.exists(image_path): image_path = os.path.join(opt.outdir, image_path) assert os.path.exists( image_path ), '** "{opt.prompt}" not found. Please enter the name of an existing image file to mask **' assert ( opt.text_mask is not None and len(opt.text_mask) >= 1 ), "** Please provide a text mask with -tm **" opt.input_file_path = image_path tm = opt.text_mask[0] threshold = float(opt.text_mask[1]) if len(opt.text_mask) > 1 else 0.5 gen.apply_textmask( image_path=image_path, prompt=tm, threshold=threshold, callback=callback, ) def do_postprocess(gen, opt, callback): file_path = opt.prompt # treat the prompt as the file pathname if opt.new_prompt is not None: opt.prompt = opt.new_prompt else: opt.prompt = None if os.path.dirname(file_path) == "": # basename given file_path = os.path.join(opt.outdir, file_path) opt.input_file_path = file_path tool = None if opt.facetool_strength > 0: tool = opt.facetool elif opt.embiggen: tool = "embiggen" elif opt.upscale: tool = "upscale" elif opt.out_direction: tool = "outpaint" elif opt.outcrop: tool = "outcrop" opt.save_original = True # do not overwrite old image! opt.last_operation = f"postprocess:{tool}" try: gen.apply_postprocessor( image_path=file_path, tool=tool, facetool_strength=opt.facetool_strength, codeformer_fidelity=opt.codeformer_fidelity, save_original=opt.save_original, upscale=opt.upscale, upscale_denoise_str=opt.esrgan_denoise_str, out_direction=opt.out_direction, outcrop=opt.outcrop, callback=callback, opt=opt, ) except OSError: print(traceback.format_exc(), file=sys.stderr) print(f"** {file_path}: file could not be read") return except (KeyError, AttributeError): print(traceback.format_exc(), file=sys.stderr) return return opt.last_operation def add_postprocessing_to_metadata(opt, original_file, new_file, tool, command): original_file = ( original_file if os.path.exists(original_file) else os.path.join(opt.outdir, original_file) ) new_file = ( new_file if os.path.exists(new_file) else os.path.join(opt.outdir, new_file) ) try: meta = retrieve_metadata(original_file)["sd-metadata"] except AttributeError: try: meta = retrieve_metadata(new_file)["sd-metadata"] except AttributeError: meta = {} if "image" not in meta: meta = metadata_dumps(opt, seeds=[opt.seed])["image"] meta["image"] = {} img_data = meta.get("image") pp = img_data.get("postprocessing", []) or [] pp.append( { "tool": tool, "dream_command": command, } ) meta["image"]["postprocessing"] = pp write_metadata(new_file, meta) def prepare_image_metadata( opt, prefix, seed, operation="generate", prior_variations=[], postprocessed=False, first_seed=None, ): if postprocessed and opt.save_original: filename = choose_postprocess_name(opt, prefix, seed) else: wildcards = dict(opt.__dict__) wildcards["prefix"] = prefix wildcards["seed"] = seed try: filename = opt.fnformat.format(**wildcards) except KeyError as e: print( f"** The filename format contains an unknown key '{e.args[0]}'. Will use {{prefix}}.{{seed}}.png' instead" ) filename = f"{prefix}.{seed}.png" except IndexError: print( "** The filename format is broken or complete. Will use '{prefix}.{seed}.png' instead" ) filename = f"{prefix}.{seed}.png" if opt.variation_amount > 0: first_seed = first_seed or seed this_variation = [[seed, opt.variation_amount]] opt.with_variations = prior_variations + this_variation formatted_dream_prompt = opt.dream_prompt_str(seed=first_seed) elif len(prior_variations) > 0: formatted_dream_prompt = opt.dream_prompt_str(seed=first_seed) elif operation == "postprocess": formatted_dream_prompt = "!fix " + opt.dream_prompt_str( seed=seed, prompt=opt.input_file_path ) else: formatted_dream_prompt = opt.dream_prompt_str(seed=seed) return filename, formatted_dream_prompt def choose_postprocess_name(opt, prefix, seed) -> str: match = re.search("postprocess:(\w+)", opt.last_operation) if match: modifier = match.group( 1 ) # will look like "gfpgan", "upscale", "outpaint" or "embiggen" else: modifier = "postprocessed" counter = 0 filename = None available = False while not available: if counter == 0: filename = f"{prefix}.{seed}.{modifier}.png" else: filename = f"{prefix}.{seed}.{modifier}-{counter:02d}.png" available = not os.path.exists(os.path.join(opt.outdir, filename)) counter += 1 return filename def get_next_command(infile=None, model_name="no model") -> str: # command string if infile is None: command = input(f"({model_name}) invoke> ").strip() else: command = infile.readline() if not command: raise EOFError else: command = command.strip() if len(command) > 0: print(f"#{command}") return command def invoke_ai_web_server_loop(gen: Generate, gfpgan, codeformer, esrgan): print("\n* --web was specified, starting web server...") from invokeai.backend import InvokeAIWebServer # Change working directory to the stable-diffusion directory os.chdir(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) invoke_ai_web_server = InvokeAIWebServer( generate=gen, gfpgan=gfpgan, codeformer=codeformer, esrgan=esrgan ) try: invoke_ai_web_server.run() except KeyboardInterrupt: pass def add_embedding_terms(gen, completer): """ Called after setting the model, updates the autocompleter with any terms loaded by the embedding manager. """ trigger_strings = gen.model.textual_inversion_manager.get_all_trigger_strings() completer.add_embedding_terms(trigger_strings) def split_variations(variations_string) -> list: # shotgun parsing, woo parts = [] broken = False # python doesn't have labeled loops... for part in variations_string.split(","): seed_and_weight = part.split(":") if len(seed_and_weight) != 2: print(f'** Could not parse with_variation part "{part}"') broken = True break try: seed = int(seed_and_weight[0]) weight = float(seed_and_weight[1]) except ValueError: print(f'** Could not parse with_variation part "{part}"') broken = True break parts.append([seed, weight]) if broken: return None elif len(parts) == 0: return None else: return parts def load_face_restoration(opt): try: gfpgan, codeformer, esrgan = None, None, None if opt.restore or opt.esrgan: from ldm.invoke.restoration import Restoration restoration = Restoration() if opt.restore: gfpgan, codeformer = restoration.load_face_restore_models( opt.gfpgan_model_path ) else: print(">> Face restoration disabled") if opt.esrgan: esrgan = restoration.load_esrgan(opt.esrgan_bg_tile) else: print(">> Upscaling disabled") else: print(">> Face restoration and upscaling disabled") except (ModuleNotFoundError, ImportError): print(traceback.format_exc(), file=sys.stderr) print(">> You may need to install the ESRGAN and/or GFPGAN modules") return gfpgan, codeformer, esrgan def make_step_callback(gen, opt, prefix): destination = os.path.join(opt.outdir, "intermediates", prefix) os.makedirs(destination, exist_ok=True) print(f">> Intermediate images will be written into {destination}") def callback(state: PipelineIntermediateState): latents = state.latents step = state.step if step % opt.save_intermediates == 0 or step == opt.steps - 1: filename = os.path.join(destination, f"{step:04}.png") image = gen.sample_to_lowres_estimated_image(latents) image = image.resize((image.size[0]*8,image.size[1]*8)) image.save(filename, "PNG") return callback def retrieve_dream_command(opt, command, completer): """ Given a full or partial path to a previously-generated image file, will retrieve and format the dream command used to generate the image, and pop it into the readline buffer (linux, Mac), or print out a comment for cut-and-paste (windows) Given a wildcard path to a folder with image png files, will retrieve and format the dream command used to generate the images, and save them to a file commands.txt for further processing """ if len(command) == 0: return tokens = command.split() dir, basename = os.path.split(tokens[0]) if len(dir) == 0: path = os.path.join(opt.outdir, basename) else: path = tokens[0] if len(tokens) > 1: return write_commands(opt, path, tokens[1]) cmd = "" try: cmd = dream_cmd_from_png(path) except OSError: print(f"## {tokens[0]}: file could not be read") except (KeyError, AttributeError, IndexError): print(f"## {tokens[0]}: file has no metadata") except: print(f"## {tokens[0]}: file could not be processed") if len(cmd) > 0: completer.set_line(cmd) def write_commands(opt, file_path: str, outfilepath: str): dir, basename = os.path.split(file_path) try: paths = sorted(list(Path(dir).glob(basename))) except ValueError: print(f'## "{basename}": unacceptable pattern') return commands = [] cmd = None for path in paths: try: cmd = dream_cmd_from_png(path) except (KeyError, AttributeError, IndexError): print(f"## {path}: file has no metadata") except: print(f"## {path}: file could not be processed") if cmd: commands.append(f"# {path}") commands.append(cmd) if len(commands) > 0: dir, basename = os.path.split(outfilepath) if len(dir) == 0: outfilepath = os.path.join(opt.outdir, basename) with open(outfilepath, "w", encoding="utf-8") as f: f.write("\n".join(commands)) print(f">> File {outfilepath} with commands created") def report_model_error(opt: Namespace, e: Exception): print(f'** An error occurred while attempting to initialize the model: "{str(e)}"') print( "** This can be caused by a missing or corrupted models file, and can sometimes be fixed by (re)installing the models." ) yes_to_all = os.environ.get("INVOKE_MODEL_RECONFIGURE") if yes_to_all: print( "** Reconfiguration is being forced by environment variable INVOKE_MODEL_RECONFIGURE" ) else: if click.confirm('Do you want to run invokeai-configure script to select and/or reinstall models?', default=True): return print("invokeai-configure is launching....\n") # Match arguments that were set on the CLI # only the arguments accepted by the configuration script are parsed root_dir = ["--root", opt.root_dir] if opt.root_dir is not None else [] config = ["--config", opt.conf] if opt.conf is not None else [] previous_args = sys.argv sys.argv = ["invokeai-configure"] sys.argv.extend(root_dir) sys.argv.extend(config) if yes_to_all is not None: for arg in yes_to_all.split(): sys.argv.append(arg) from ldm.invoke.config import invokeai_configure invokeai_configure.main() print("** InvokeAI will now restart") sys.argv = previous_args main() # would rather do a os.exec(), but doesn't exist? sys.exit(0) def check_internet() -> bool: """ Return true if the internet is reachable. It does this by pinging huggingface.co. """ import urllib.request host = "http://huggingface.co" try: urllib.request.urlopen(host, timeout=1) return True except: return False if __name__ == '__main__': main()