blackified

This commit is contained in:
Lincoln Stein 2023-08-05 12:44:58 -04:00
parent c8ceb96091
commit 4043a4c21c

View File

@ -26,18 +26,24 @@ from prompt_toolkit.completion import PathCompleter
from prompt_toolkit.key_binding import KeyBindings
from invokeai.app.services.config import InvokeAIAppConfig
app_config = InvokeAIAppConfig.get_config()
bindings = KeyBindings()
@bindings.add('c-c')
@bindings.add("c-c")
def _(event):
raise KeyboardInterrupt
# release notes
# "Use All" with size dimensions not selectable in the UI will not load dimensions
class Config:
"""Configuration loader."""
def __init__(self):
pass
@ -86,7 +92,7 @@ class Config:
text += "\n\nUse these paths for import (yes) or choose different ones (no) [Yn]: "
if db_exists and outdir_exists:
if (prompt(text).strip() or 'Y').upper().startswith('Y'):
if (prompt(text).strip() or "Y").upper().startswith("Y"):
self.database_path = database_path
self.outputs_path = outputs_path
return True
@ -98,35 +104,39 @@ class Config:
else:
message_dialog(
title="Path not found",
text=f"Auto-discovery of configuration failed! Could not find ({yaml_path}), Custom paths can be specified."
text=f"Auto-discovery of configuration failed! Could not find ({yaml_path}), Custom paths can be specified.",
).run()
return False
def confirm_and_load_from_user(self):
default = ''
default = ""
while True:
database_path = os.path.expanduser(
prompt(
"Database: Specify absolute path to the database to import into: ",
completer=PathCompleter(expanduser=True, file_filter=lambda x: Path(x).is_dir() or x.endswith(('.db'))),
completer=PathCompleter(
expanduser=True, file_filter=lambda x: Path(x).is_dir() or x.endswith((".db"))
),
default=default,
))
)
)
if database_path.endswith(".db") and os.path.isabs(database_path) and os.path.exists(database_path):
break
default = database_path + '/' if Path(database_path).is_dir() else database_path
default = database_path + "/" if Path(database_path).is_dir() else database_path
default = ''
default = ""
while True:
outputs_path = os.path.expanduser(
prompt(
"Outputs: Specify absolute path to outputs/images directory to import into: ",
completer=PathCompleter(expanduser=True, only_directories=True),
default=default,
))
)
)
if outputs_path.endswith("images") and os.path.isabs(outputs_path) and os.path.exists(outputs_path):
break
default = outputs_path + '/' if Path(outputs_path).is_dir() else outputs_path
default = outputs_path + "/" if Path(outputs_path).is_dir() else outputs_path
self.database_path = database_path
self.outputs_path = outputs_path
@ -136,7 +146,7 @@ class Config:
def load_paths_from_yaml(self, yaml_path):
"""Load an Invoke AI yaml file and get the database and outputs paths."""
try:
with open(yaml_path, 'rt', encoding=locale.getpreferredencoding()) as file:
with open(yaml_path, "rt", encoding=locale.getpreferredencoding()) as file:
yamlinfo = yaml.safe_load(file)
db_dir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("db_dir", None)
outdir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("outdir", None)
@ -145,8 +155,10 @@ class Config:
print(f"Failed to load paths from yaml file! {yaml_path}!")
return None, None
class ImportStats:
"""DTO for tracking work progress."""
def __init__(self):
pass
@ -171,8 +183,10 @@ class ImportStats:
out_str += f"{seconds:.2f} second(s)"
return out_str
class InvokeAIMetadata:
"""DTO for core Invoke AI generation properties parsed from metadata."""
def __init__(self):
pass
@ -234,6 +248,7 @@ class InvokeAIMetadata:
class InvokeAIMetadataParser:
"""Parses strings with json data to find Invoke AI core metadata properties."""
def __init__(self):
pass
@ -319,17 +334,28 @@ class InvokeAIMetadataParser:
return None
match (old_scheduler):
case "ddim" : return "ddim"
case "plms" : return "pnmd"
case "k_lms" : return "lms"
case "k_dpm_2" : return "kdpm_2"
case "k_dpm_2_a" : return "kdpm_2_a"
case "dpmpp_2" : return "dpmpp_2s"
case "k_dpmpp_2" : return "dpmpp_2m"
case "k_dpmpp_2_a" : return None #invalid, in 2.3.x, selecting this sample would just fallback to last run or plms if new session
case "k_euler" : return "euler"
case "k_euler_a" : return "euler_a"
case "k_heun" : return "heun"
case "ddim":
return "ddim"
case "plms":
return "pnmd"
case "k_lms":
return "lms"
case "k_dpm_2":
return "kdpm_2"
case "k_dpm_2_a":
return "kdpm_2_a"
case "dpmpp_2":
return "dpmpp_2s"
case "k_dpmpp_2":
return "dpmpp_2m"
case "k_dpmpp_2_a":
return None # invalid, in 2.3.x, selecting this sample would just fallback to last run or plms if new session
case "k_euler":
return "euler"
case "k_euler_a":
return "euler_a"
case "k_heun":
return "heun"
return None
def split_prompt(self, raw_prompt: str):
@ -341,7 +367,7 @@ class InvokeAIMetadataParser:
if len(matches) > 0:
negative_prompt = ""
if len(matches) == 1:
negative_prompt = matches[0].strip().strip(',')
negative_prompt = matches[0].strip().strip(",")
else:
for match in matches:
negative_prompt += f"({match.strip().strip(',')})"
@ -352,8 +378,10 @@ class InvokeAIMetadataParser:
return positive_prompt, negative_prompt
class DatabaseMapper:
"""Class to abstract database functionality."""
def __init__(self, database_path, database_backup_dir):
self.database_path = database_path
self.database_backup_dir = database_backup_dir
@ -427,8 +455,10 @@ VALUES ('{filename}', 'internal', 'general', {width}, {height}, null, null, '{me
shutil.copy2(self.database_path, database_backup_path)
print("Done!")
class MediaImportProcessor:
"""Containing class for script functionality."""
def __init__(self):
pass
@ -437,25 +467,28 @@ class MediaImportProcessor:
def get_import_file_list(self):
"""Ask the user for the import folder and scan for the list of files to return."""
while True:
default = ''
default = ""
while True:
import_dir = os.path.expanduser(
prompt(
"Inputs: Specify absolute path containing InvokeAI .png images to import: ",
completer=PathCompleter(expanduser=True, only_directories=True),
default=default,
))
)
)
if len(import_dir) > 0 and Path(import_dir).is_dir():
break
default = import_dir
recurse_directories = (prompt("Include files from subfolders recursively [yN]? ").strip() or 'N').upper().startswith('N')
recurse_directories = (
(prompt("Include files from subfolders recursively [yN]? ").strip() or "N").upper().startswith("N")
)
if recurse_directories:
is_recurse = False
matching_file_list = glob.glob(import_dir + '/*.png', recursive=False)
matching_file_list = glob.glob(import_dir + "/*.png", recursive=False)
else:
is_recurse = True
matching_file_list = glob.glob(import_dir + '/**/*.png', recursive=True)
matching_file_list = glob.glob(import_dir + "/**/*.png", recursive=True)
if len(matching_file_list) > 0:
return import_dir, is_recurse, matching_file_list
@ -477,15 +510,21 @@ class MediaImportProcessor:
print(f"1) Select an existing board name. (found {len(board_names)})")
print("2) Specify a board name to create/add to.")
print("3) Create/add to board named 'IMPORT'.")
print(f"4) Create/add to board named 'IMPORT' with the current datetime string appended (.e.g IMPORT_{timestamp_string}).")
print( "5) Create/add to board named 'IMPORT' with a the original file app_version appended (.e.g IMPORT_2.2.5).")
print(
f"4) Create/add to board named 'IMPORT' with the current datetime string appended (.e.g IMPORT_{timestamp_string})."
)
print(
"5) Create/add to board named 'IMPORT' with a the original file app_version appended (.e.g IMPORT_2.2.5)."
)
input_option = input("Specify desired board option: ")
match (input_option):
case "1":
if len(board_names) < 1:
print("\r\nThere are no existing board names to choose from. Select another option!")
continue
board_name = self.select_item_from_list(board_names, "board name", True, "Cancel, go back and choose a different board option.")
board_name = self.select_item_from_list(
board_names, "board name", True, "Cancel, go back and choose a different board option."
)
if board_name is not None:
return board_name
case "2":
@ -582,7 +621,9 @@ class MediaImportProcessor:
# if metadata needs update, then update metdata and copy in one shot
if destination_needs_meta_update:
print("Updating metadata while copying...", end="")
self.update_file_metadata_while_copying(filepath, file_destination_path, "invokeai_metadata", latest_json_string)
self.update_file_metadata_while_copying(
filepath, file_destination_path, "invokeai_metadata", latest_json_string
)
print("Done!")
else:
print("No metadata update necessary, copying only...", end="")
@ -648,7 +689,7 @@ class MediaImportProcessor:
print("===============================================================================")
print("This script will import images generated by earlier versions of")
print("InvokeAI into the currently installed root directory:")
print(f' {app_config.root_path}')
print(f" {app_config.root_path}")
print("If this is not what you want to do, type ctrl-C now to cancel.")
# load config
@ -683,16 +724,22 @@ class MediaImportProcessor:
print("- If the same file name already exists in the destination, the file will be skipped.")
print("- If the same file name already has a record in the database, the file will be skipped.")
print("- Invoke AI metadata tags will be updated/written into the imported copy only.")
print("- On the imported copy, only Invoke AI known tags (latest and legacy) will be retained (dream, sd-metadata, invokeai, invokeai_metadata)")
print("- A property 'imported_app_version' will be added to metadata that can be viewed in the UI's metadata viewer.")
print("- The new 3.x InvokeAI outputs folder structure is flat so recursively found source imges will all be placed into the single outputs/images folder.")
print(
"- On the imported copy, only Invoke AI known tags (latest and legacy) will be retained (dream, sd-metadata, invokeai, invokeai_metadata)"
)
print(
"- A property 'imported_app_version' will be added to metadata that can be viewed in the UI's metadata viewer."
)
print(
"- The new 3.x InvokeAI outputs folder structure is flat so recursively found source imges will all be placed into the single outputs/images folder."
)
while True:
should_continue = prompt("\nDo you wish to continue with the import [Yn] ? ").lower() or 'y'
if should_continue=='n':
should_continue = prompt("\nDo you wish to continue with the import [Yn] ? ").lower() or "y"
if should_continue == "n":
print("\r\nCancelling Import")
return
elif should_continue=='y':
elif should_continue == "y":
print()
break
@ -728,6 +775,7 @@ class MediaImportProcessor:
for key, version in ImportStats.count_imported_by_version.items():
print(f" {key:20} : {version}")
def main():
try:
processor = MediaImportProcessor()
@ -735,5 +783,6 @@ def main():
except KeyboardInterrupt:
print("\r\n\r\nUser cancelled execution.")
if __name__ == "__main__":
main()