diff --git a/invokeai/frontend/install/import_images.py b/invokeai/frontend/install/import_images.py index 1870fafbf8..fe0f20c830 100644 --- a/invokeai/frontend/install/import_images.py +++ b/invokeai/frontend/install/import_images.py @@ -26,22 +26,28 @@ from prompt_toolkit.completion import PathCompleter from prompt_toolkit.key_binding import KeyBindings from invokeai.app.services.config import InvokeAIAppConfig + app_config = InvokeAIAppConfig.get_config() bindings = KeyBindings() -@bindings.add('c-c') + + +@bindings.add("c-c") def _(event): raise KeyboardInterrupt + # release notes # "Use All" with size dimensions not selectable in the UI will not load dimensions + class Config: """Configuration loader.""" + def __init__(self): pass - TIMESTAMP_STRING= datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + TIMESTAMP_STRING = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") INVOKE_DIRNAME = "invokeai" YAML_FILENAME = "invokeai.yaml" @@ -56,21 +62,21 @@ class Config: """find the yaml config file and load""" root = app_config.root_path if not self.confirm_and_load(os.path.abspath(root)): - print ("\r\nSpecify custom database and outputs paths:") + print("\r\nSpecify custom database and outputs paths:") self.confirm_and_load_from_user() - self.database_backup_dir = os.path.join(os.path.dirname(self.database_path),"backup") - self.thumbnail_path = os.path.join(self.outputs_path,"thumbnails") + self.database_backup_dir = os.path.join(os.path.dirname(self.database_path), "backup") + self.thumbnail_path = os.path.join(self.outputs_path, "thumbnails") def confirm_and_load(self, invoke_root): """Validates a yaml path exists, confirms the user wants to use it and loads config.""" - yaml_path = os.path.join(invoke_root,self.YAML_FILENAME) + yaml_path = os.path.join(invoke_root, self.YAML_FILENAME) if os.path.exists(yaml_path): db_dir, outdir = self.load_paths_from_yaml(yaml_path) if os.path.isabs(db_dir): - database_path = os.path.join(db_dir,self.DATABASE_FILENAME) + database_path = os.path.join(db_dir, self.DATABASE_FILENAME) else: - database_path = os.path.join(invoke_root,db_dir,self.DATABASE_FILENAME) + database_path = os.path.join(invoke_root, db_dir, self.DATABASE_FILENAME) if os.path.isabs(outdir): outputs_path = os.path.join(outdir, "images") @@ -86,7 +92,7 @@ class Config: text += "\n\nUse these paths for import (yes) or choose different ones (no) [Yn]: " if db_exists and outdir_exists: - if (prompt(text).strip() or 'Y').upper().startswith('Y'): + if (prompt(text).strip() or "Y").upper().startswith("Y"): self.database_path = database_path self.outputs_path = outputs_path return True @@ -98,36 +104,40 @@ class Config: else: message_dialog( title="Path not found", - text=f"Auto-discovery of configuration failed! Could not find ({yaml_path}), Custom paths can be specified." + text=f"Auto-discovery of configuration failed! Could not find ({yaml_path}), Custom paths can be specified.", ).run() return False def confirm_and_load_from_user(self): - default = '' + default = "" while True: database_path = os.path.expanduser( prompt( "Database: Specify absolute path to the database to import into: ", - completer=PathCompleter(expanduser=True, file_filter=lambda x: Path(x).is_dir() or x.endswith(('.db'))), - default = default, - )) + completer=PathCompleter( + expanduser=True, file_filter=lambda x: Path(x).is_dir() or x.endswith((".db")) + ), + default=default, + ) + ) if database_path.endswith(".db") and os.path.isabs(database_path) and os.path.exists(database_path): break - default = database_path + '/' if Path(database_path).is_dir() else database_path + default = database_path + "/" if Path(database_path).is_dir() else database_path - default = '' + default = "" while True: outputs_path = os.path.expanduser( prompt( "Outputs: Specify absolute path to outputs/images directory to import into: ", - completer = PathCompleter(expanduser=True, only_directories=True), - default = default, - )) + completer=PathCompleter(expanduser=True, only_directories=True), + default=default, + ) + ) if outputs_path.endswith("images") and os.path.isabs(outputs_path) and os.path.exists(outputs_path): break - default = outputs_path + '/' if Path(outputs_path).is_dir() else outputs_path - + default = outputs_path + "/" if Path(outputs_path).is_dir() else outputs_path + self.database_path = database_path self.outputs_path = outputs_path @@ -136,17 +146,19 @@ class Config: def load_paths_from_yaml(self, yaml_path): """Load an Invoke AI yaml file and get the database and outputs paths.""" try: - with open(yaml_path, 'rt', encoding=locale.getpreferredencoding()) as file: + with open(yaml_path, "rt", encoding=locale.getpreferredencoding()) as file: yamlinfo = yaml.safe_load(file) - db_dir = yamlinfo.get("InvokeAI",{}).get("Paths",{}).get("db_dir", None) - outdir = yamlinfo.get("InvokeAI",{}).get("Paths",{}).get("outdir", None) + db_dir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("db_dir", None) + outdir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("outdir", None) return db_dir, outdir except Exception: print(f"Failed to load paths from yaml file! {yaml_path}!") return None, None + class ImportStats: """DTO for tracking work progress.""" + def __init__(self): pass @@ -171,8 +183,10 @@ class ImportStats: out_str += f"{seconds:.2f} second(s)" return out_str + class InvokeAIMetadata: """DTO for core Invoke AI generation properties parsed from metadata.""" + def __init__(self): pass @@ -234,6 +248,7 @@ class InvokeAIMetadata: class InvokeAIMetadataParser: """Parses strings with json data to find Invoke AI core metadata properties.""" + def __init__(self): pass @@ -271,15 +286,15 @@ class InvokeAIMetadataParser: props.width = img_node.get("width") props.height = img_node.get("height") props.seed = img_node.get("seed") - props.rand_device = "cuda" #hardcoded since all generations pre 3.0 used cuda random noise instead of cpu + props.rand_device = "cuda" # hardcoded since all generations pre 3.0 used cuda random noise instead of cpu props.cfg_scale = img_node.get("cfg_scale") props.steps = img_node.get("steps") props.scheduler = self.map_scheduler(img_node.get("sampler")) props.strength = img_node.get("strength") if props.strength is None: - props.strength = img_node.get("strength_steps") # try second name for this property + props.strength = img_node.get("strength_steps") # try second name for this property props.init_image = img_node.get("init_image_path") - if props.init_image is None: # try second name for this property + if props.init_image is None: # try second name for this property props.init_image = img_node.get("init_img") # remove the path info from init_image so if we move the init image, it will be correctly relative in the new location if props.init_image is not None: @@ -299,7 +314,7 @@ class InvokeAIMetadataParser: props.imported_app_version = "3.0.0 or later" props.generation_mode = tag_value.get("type") if props.generation_mode is not None: - props.generation_mode = props.generation_mode.replace("t2l","txt2img").replace("l2l","img2img") + props.generation_mode = props.generation_mode.replace("t2l", "txt2img").replace("l2l", "img2img") props.width = tag_value.get("width") props.height = tag_value.get("height") @@ -318,30 +333,41 @@ class InvokeAIMetadataParser: if old_scheduler is None: return None - match(old_scheduler): - case "ddim" : return "ddim" - case "plms" : return "pnmd" - case "k_lms" : return "lms" - case "k_dpm_2" : return "kdpm_2" - case "k_dpm_2_a" : return "kdpm_2_a" - case "dpmpp_2" : return "dpmpp_2s" - case "k_dpmpp_2" : return "dpmpp_2m" - case "k_dpmpp_2_a" : return None #invalid, in 2.3.x, selecting this sample would just fallback to last run or plms if new session - case "k_euler" : return "euler" - case "k_euler_a" : return "euler_a" - case "k_heun" : return "heun" + match (old_scheduler): + case "ddim": + return "ddim" + case "plms": + return "pnmd" + case "k_lms": + return "lms" + case "k_dpm_2": + return "kdpm_2" + case "k_dpm_2_a": + return "kdpm_2_a" + case "dpmpp_2": + return "dpmpp_2s" + case "k_dpmpp_2": + return "dpmpp_2m" + case "k_dpmpp_2_a": + return None # invalid, in 2.3.x, selecting this sample would just fallback to last run or plms if new session + case "k_euler": + return "euler" + case "k_euler_a": + return "euler_a" + case "k_heun": + return "heun" return None def split_prompt(self, raw_prompt: str): """Split the unified prompt strings by extracting all negative prompt blocks out into the negative prompt.""" if raw_prompt is None: return "", "" - raw_prompt_search = raw_prompt.replace("\r","").replace("\n","") + raw_prompt_search = raw_prompt.replace("\r", "").replace("\n", "") matches = re.findall(r"\[(.+?)\]", raw_prompt_search) if len(matches) > 0: negative_prompt = "" if len(matches) == 1: - negative_prompt = matches[0].strip().strip(',') + negative_prompt = matches[0].strip().strip(",") else: for match in matches: negative_prompt += f"({match.strip().strip(',')})" @@ -350,10 +376,12 @@ class InvokeAIMetadataParser: positive_prompt = raw_prompt_search.strip() negative_prompt = "" - return positive_prompt,negative_prompt + return positive_prompt, negative_prompt + class DatabaseMapper: """Class to abstract database functionality.""" + def __init__(self, database_path, database_backup_dir): self.database_path = database_path self.database_backup_dir = database_backup_dir @@ -380,7 +408,7 @@ class DatabaseMapper: return True if len(rows) > 0 else False def add_new_image_to_database(self, filename, width, height, metadata, modified_date_string): - """Add an image to the database.""" + """Add an image to the database.""" sql_add_image = f"""INSERT INTO images (image_name, image_origin, image_category, width, height, session_id, node_id, metadata, is_intermediate, created_at, updated_at) VALUES ('{filename}', 'internal', 'general', {width}, {height}, null, null, '{metadata}', 0, '{modified_date_string}', '{modified_date_string}')""" self.cursor.execute(sql_add_image) @@ -391,7 +419,7 @@ VALUES ('{filename}', 'internal', 'general', {width}, {height}, null, null, '{me sql_find_board = f"SELECT board_id FROM boards WHERE board_name='{board_name}' COLLATE NOCASE" self.cursor.execute(sql_find_board) rows = self.cursor.fetchall() - if len(rows)>0: + if len(rows) > 0: return rows[0][0] else: board_date_string = datetime.datetime.utcnow().date().isoformat() @@ -417,18 +445,20 @@ VALUES ('{filename}', 'internal', 'general', {width}, {height}, null, null, '{me self.connection.close() def backup(self, timestamp_string): - """Take a backup of the database.""" + """Take a backup of the database.""" if not os.path.exists(self.database_backup_dir): - print (f"Database backup directory {self.database_backup_dir} does not exist -> creating...", end="") + print(f"Database backup directory {self.database_backup_dir} does not exist -> creating...", end="") os.makedirs(self.database_backup_dir) - print ("Done!") + print("Done!") database_backup_path = os.path.join(self.database_backup_dir, f"backup-{timestamp_string}-invokeai.db") - print (f"Making DB Backup at {database_backup_path}...", end="") + print(f"Making DB Backup at {database_backup_path}...", end="") shutil.copy2(self.database_path, database_backup_path) - print ("Done!") + print("Done!") + class MediaImportProcessor: """Containing class for script functionality.""" + def __init__(self): pass @@ -437,25 +467,28 @@ class MediaImportProcessor: def get_import_file_list(self): """Ask the user for the import folder and scan for the list of files to return.""" while True: - default = '' + default = "" while True: import_dir = os.path.expanduser( prompt( "Inputs: Specify absolute path containing InvokeAI .png images to import: ", - completer = PathCompleter(expanduser=True, only_directories=True), - default = default, - )) - if len(import_dir)>0 and Path(import_dir).is_dir(): + completer=PathCompleter(expanduser=True, only_directories=True), + default=default, + ) + ) + if len(import_dir) > 0 and Path(import_dir).is_dir(): break default = import_dir - recurse_directories = (prompt("Include files from subfolders recursively [yN]? ").strip() or 'N').upper().startswith('N') + recurse_directories = ( + (prompt("Include files from subfolders recursively [yN]? ").strip() or "N").upper().startswith("N") + ) if recurse_directories: is_recurse = False - matching_file_list = glob.glob(import_dir + '/*.png', recursive=False) + matching_file_list = glob.glob(import_dir + "/*.png", recursive=False) else: is_recurse = True - matching_file_list = glob.glob(import_dir + '/**/*.png', recursive=True) + matching_file_list = glob.glob(import_dir + "/**/*.png", recursive=True) if len(matching_file_list) > 0: return import_dir, is_recurse, matching_file_list @@ -475,17 +508,23 @@ class MediaImportProcessor: while True: print("\r\nOptions for board selection for imported images:") print(f"1) Select an existing board name. (found {len(board_names)})") - print( "2) Specify a board name to create/add to.") - print( "3) Create/add to board named 'IMPORT'.") - print(f"4) Create/add to board named 'IMPORT' with the current datetime string appended (.e.g IMPORT_{timestamp_string}).") - print( "5) Create/add to board named 'IMPORT' with a the original file app_version appended (.e.g IMPORT_2.2.5).") + print("2) Specify a board name to create/add to.") + print("3) Create/add to board named 'IMPORT'.") + print( + f"4) Create/add to board named 'IMPORT' with the current datetime string appended (.e.g IMPORT_{timestamp_string})." + ) + print( + "5) Create/add to board named 'IMPORT' with a the original file app_version appended (.e.g IMPORT_2.2.5)." + ) input_option = input("Specify desired board option: ") - match(input_option): - case "1" : + match (input_option): + case "1": if len(board_names) < 1: print("\r\nThere are no existing board names to choose from. Select another option!") continue - board_name = self.select_item_from_list(board_names, "board name", True, "Cancel, go back and choose a different board option.") + board_name = self.select_item_from_list( + board_names, "board name", True, "Cancel, go back and choose a different board option." + ) if board_name is not None: return board_name case "2": @@ -493,7 +532,7 @@ class MediaImportProcessor: board_name = input("Specify new/existing board name: ") if board_name: return board_name - case "3" : + case "3": return "IMPORT" case "4": return f"IMPORT_{timestamp_string}" @@ -502,7 +541,7 @@ class MediaImportProcessor: def select_item_from_list(self, items, entity_name, allow_cancel, cancel_string): """A general function to render a list of items to select in the console, prompt the user for a selection and ensure a valid entry is selected.""" - print (f"Select a {entity_name.lower()} from the following list:") + print(f"Select a {entity_name.lower()} from the following list:") index = 1 for item in items: print(f"{index}) {item}") @@ -516,8 +555,8 @@ class MediaImportProcessor: continue if allow_cancel and option_number == index: return None - if option_number >=1 and option_number <= len(items): - return items[option_number-1] + if option_number >= 1 and option_number <= len(items): + return items[option_number - 1] def import_image(self, filepath: str, board_name_option: str, db_mapper: DatabaseMapper, config: Config): """Import a single file by its path""" @@ -577,12 +616,14 @@ class MediaImportProcessor: latest_json_string = converted_field.to_json() - print (f"From Invoke AI Version {log_version_note} with dimensions {png_width} x {png_height}.") + print(f"From Invoke AI Version {log_version_note} with dimensions {png_width} x {png_height}.") # if metadata needs update, then update metdata and copy in one shot if destination_needs_meta_update: print("Updating metadata while copying...", end="") - self.update_file_metadata_while_copying(filepath, file_destination_path, "invokeai_metadata", latest_json_string) + self.update_file_metadata_while_copying( + filepath, file_destination_path, "invokeai_metadata", latest_json_string + ) print("Done!") else: print("No metadata update necessary, copying only...", end="") @@ -591,7 +632,7 @@ class MediaImportProcessor: # create thumbnail print("Creating thumbnail...", end="") - thumbnail_path = os.path.join(config.thumbnail_path, os.path.splitext(file_name)[0]) + ".webp" + thumbnail_path = os.path.join(config.thumbnail_path, os.path.splitext(file_name)[0]) + ".webp" thumbnail_size = 256, 256 with PIL.Image.open(filepath) as source_image: source_image.thumbnail(thumbnail_size) @@ -619,7 +660,7 @@ class MediaImportProcessor: db_mapper.add_new_image_to_database(file_name, png_width, png_height, latest_json_string, modified_time) print("Done!") - #add image to board + # add image to board print("Adding image to board......", end="") db_mapper.add_image_to_board(file_name, board_id) print("Done!") @@ -638,7 +679,7 @@ class MediaImportProcessor: # re-add any existing invoke ai tags unless they are the one we are trying to add for key in existing_img_info: if key != tag_name and key in ("dream", "Dream", "sd-metadata", "invokeai", "invokeai_metadata"): - metadata.add_text(key,existing_img_info[key]) + metadata.add_text(key, existing_img_info[key]) metadata.add_text(tag_name, tag_value) target_image.save(file_destination_path, pnginfo=metadata) @@ -648,7 +689,7 @@ class MediaImportProcessor: print("===============================================================================") print("This script will import images generated by earlier versions of") print("InvokeAI into the currently installed root directory:") - print(f' {app_config.root_path}') + print(f" {app_config.root_path}") print("If this is not what you want to do, type ctrl-C now to cancel.") # load config @@ -683,16 +724,22 @@ class MediaImportProcessor: print("- If the same file name already exists in the destination, the file will be skipped.") print("- If the same file name already has a record in the database, the file will be skipped.") print("- Invoke AI metadata tags will be updated/written into the imported copy only.") - print("- On the imported copy, only Invoke AI known tags (latest and legacy) will be retained (dream, sd-metadata, invokeai, invokeai_metadata)") - print("- A property 'imported_app_version' will be added to metadata that can be viewed in the UI's metadata viewer.") - print("- The new 3.x InvokeAI outputs folder structure is flat so recursively found source imges will all be placed into the single outputs/images folder.") + print( + "- On the imported copy, only Invoke AI known tags (latest and legacy) will be retained (dream, sd-metadata, invokeai, invokeai_metadata)" + ) + print( + "- A property 'imported_app_version' will be added to metadata that can be viewed in the UI's metadata viewer." + ) + print( + "- The new 3.x InvokeAI outputs folder structure is flat so recursively found source imges will all be placed into the single outputs/images folder." + ) while True: - should_continue = prompt("\nDo you wish to continue with the import [Yn] ? ").lower() or 'y' - if should_continue=='n': + should_continue = prompt("\nDo you wish to continue with the import [Yn] ? ").lower() or "y" + if should_continue == "n": print("\r\nCancelling Import") return - elif should_continue=='y': + elif should_continue == "y": print() break @@ -725,9 +772,10 @@ class MediaImportProcessor: print(f"Errors during import : {ImportStats.count_file_errors}") if ImportStats.count_imported > 0: print("\r\nBreakdown of imported files by version:") - for key,version in ImportStats.count_imported_by_version.items(): + for key, version in ImportStats.count_imported_by_version.items(): print(f" {key:20} : {version}") + def main(): try: processor = MediaImportProcessor() @@ -735,5 +783,6 @@ def main(): except KeyboardInterrupt: print("\r\n\r\nUser cancelled execution.") + if __name__ == "__main__": main()