mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
578 lines
26 KiB
Python
578 lines
26 KiB
Python
# pylint: disable=line-too-long
|
|
# pylint: disable=broad-exception-caught
|
|
# pylint: disable=missing-function-docstring
|
|
"""Script to peform db maintenance and outputs directory management."""
|
|
|
|
import argparse
|
|
import datetime
|
|
import enum
|
|
import glob
|
|
import locale
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
import PIL
|
|
import PIL.ImageOps
|
|
import PIL.PngImagePlugin
|
|
import yaml
|
|
|
|
|
|
class ConfigMapper:
|
|
"""Configuration loader."""
|
|
|
|
def __init__(self): # noqa D107
|
|
pass
|
|
|
|
TIMESTAMP_STRING = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
INVOKE_DIRNAME = "invokeai"
|
|
YAML_FILENAME = "invokeai.yaml"
|
|
DATABASE_FILENAME = "invokeai.db"
|
|
|
|
DEFAULT_OUTDIR = "outputs"
|
|
DEFAULT_DB_DIR = "databases"
|
|
|
|
database_path = None
|
|
database_backup_dir = None
|
|
outputs_path = None
|
|
archive_path = None
|
|
thumbnails_path = None
|
|
thumbnails_archive_path = None
|
|
|
|
def load(self):
|
|
"""Read paths from yaml config and validate."""
|
|
root = "."
|
|
|
|
if not self.__load_from_root_config(os.path.abspath(root)):
|
|
return False
|
|
|
|
return True
|
|
|
|
def __load_from_root_config(self, invoke_root):
|
|
"""Validate a yaml path exists, confirm the user wants to use it and load config."""
|
|
yaml_path = os.path.join(invoke_root, self.YAML_FILENAME)
|
|
if not os.path.exists(yaml_path):
|
|
print(f"Unable to find invokeai.yaml at {yaml_path}!")
|
|
return False
|
|
if os.path.exists(yaml_path):
|
|
db_dir, outdir = self.__load_paths_from_yaml_file(yaml_path)
|
|
|
|
if db_dir is None:
|
|
db_dir = self.DEFAULT_DB_DIR
|
|
print(f"The invokeai.yaml file was found but is missing the db_dir setting! Defaulting to {db_dir}")
|
|
if outdir is None:
|
|
outdir = self.DEFAULT_OUTDIR
|
|
print(f"The invokeai.yaml file was found but is missing the outdir setting! Defaulting to {outdir}")
|
|
|
|
if os.path.isabs(db_dir):
|
|
self.database_path = os.path.join(db_dir, self.DATABASE_FILENAME)
|
|
else:
|
|
self.database_path = os.path.join(invoke_root, db_dir, self.DATABASE_FILENAME)
|
|
|
|
self.database_backup_dir = os.path.join(os.path.dirname(self.database_path), "backup")
|
|
|
|
if os.path.isabs(outdir):
|
|
self.outputs_path = os.path.join(outdir, "images")
|
|
self.archive_path = os.path.join(outdir, "images-archive")
|
|
else:
|
|
self.outputs_path = os.path.join(invoke_root, outdir, "images")
|
|
self.archive_path = os.path.join(invoke_root, outdir, "images-archive")
|
|
|
|
self.thumbnails_path = os.path.join(self.outputs_path, "thumbnails")
|
|
self.thumbnails_archive_path = os.path.join(self.archive_path, "thumbnails")
|
|
|
|
db_exists = os.path.exists(self.database_path)
|
|
outdir_exists = os.path.exists(self.outputs_path)
|
|
|
|
text = f"Found {self.YAML_FILENAME} file at {yaml_path}:"
|
|
text += f"\n Database : {self.database_path} - {'Exists!' if db_exists else 'Not Found!'}"
|
|
text += f"\n Outputs : {self.outputs_path}- {'Exists!' if outdir_exists else 'Not Found!'}"
|
|
print(text)
|
|
|
|
if db_exists and outdir_exists:
|
|
return True
|
|
else:
|
|
print(
|
|
"\nOne or more paths specified in invoke.yaml do not exist. Please inspect/correct the configuration and ensure the script is run in the developer console mode (option 8) from an Invoke AI root directory."
|
|
)
|
|
return False
|
|
else:
|
|
print(
|
|
f"Auto-discovery of configuration failed! Could not find ({yaml_path})!\n\nPlease ensure the script is run in the developer console mode (option 8) from an Invoke AI root directory."
|
|
)
|
|
return False
|
|
|
|
def __load_paths_from_yaml_file(self, yaml_path):
|
|
"""Load an Invoke AI yaml file and get the database and outputs paths."""
|
|
try:
|
|
with open(yaml_path, "rt", encoding=locale.getpreferredencoding()) as file:
|
|
yamlinfo = yaml.safe_load(file)
|
|
db_dir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("db_dir", None)
|
|
outdir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("outdir", None)
|
|
return db_dir, outdir
|
|
except Exception:
|
|
print(f"Failed to load paths from yaml file! {yaml_path}!")
|
|
return None, None
|
|
|
|
|
|
class MaintenanceStats:
|
|
"""DTO for tracking work progress."""
|
|
|
|
def __init__(self): # noqa D107
|
|
pass
|
|
|
|
time_start = datetime.datetime.utcnow()
|
|
count_orphaned_db_entries_cleaned = 0
|
|
count_orphaned_disk_files_cleaned = 0
|
|
count_orphaned_thumbnails_cleaned = 0
|
|
count_thumbnails_regenerated = 0
|
|
count_errors = 0
|
|
|
|
@staticmethod
|
|
def get_elapsed_time_string():
|
|
"""Get a friendly time string for the time elapsed since processing start."""
|
|
time_now = datetime.datetime.utcnow()
|
|
total_seconds = (time_now - MaintenanceStats.time_start).total_seconds()
|
|
hours = int((total_seconds) / 3600)
|
|
minutes = int(((total_seconds) % 3600) / 60)
|
|
seconds = total_seconds % 60
|
|
out_str = f"{hours} hour(s) -" if hours > 0 else ""
|
|
out_str += f"{minutes} minute(s) -" if minutes > 0 else ""
|
|
out_str += f"{seconds:.2f} second(s)"
|
|
return out_str
|
|
|
|
|
|
class DatabaseMapper:
|
|
"""Class to abstract database functionality."""
|
|
|
|
def __init__(self, database_path, database_backup_dir): # noqa D107
|
|
self.database_path = database_path
|
|
self.database_backup_dir = database_backup_dir
|
|
self.connection = None
|
|
self.cursor = None
|
|
|
|
def backup(self, timestamp_string):
|
|
"""Take a backup of the database."""
|
|
if not os.path.exists(self.database_backup_dir):
|
|
print(f"Database backup directory {self.database_backup_dir} does not exist -> creating...", end="")
|
|
os.makedirs(self.database_backup_dir)
|
|
print("Done!")
|
|
database_backup_path = os.path.join(self.database_backup_dir, f"backup-{timestamp_string}-invokeai.db")
|
|
print(f"Making DB Backup at {database_backup_path}...", end="")
|
|
shutil.copy2(self.database_path, database_backup_path)
|
|
print("Done!")
|
|
|
|
def connect(self):
|
|
"""Open connection to the database."""
|
|
self.connection = sqlite3.connect(self.database_path)
|
|
self.cursor = self.connection.cursor()
|
|
|
|
def get_all_image_files(self):
|
|
"""Get the full list of image file names from the database."""
|
|
sql_get_image_by_name = "SELECT image_name FROM images"
|
|
self.cursor.execute(sql_get_image_by_name)
|
|
rows = self.cursor.fetchall()
|
|
db_files = []
|
|
for row in rows:
|
|
db_files.append(row[0])
|
|
return db_files
|
|
|
|
def remove_image_file_record(self, filename: str):
|
|
"""Remove an image file reference from the database by filename."""
|
|
sanitized_filename = str.replace(filename, "'", "''") # prevent injection
|
|
sql_command = f"DELETE FROM images WHERE image_name='{sanitized_filename}'"
|
|
self.cursor.execute(sql_command)
|
|
self.connection.commit()
|
|
|
|
def does_image_exist(self, image_filename):
|
|
"""Check database if a image name already exists and return a boolean."""
|
|
sanitized_filename = str.replace(image_filename, "'", "''") # prevent injection
|
|
sql_get_image_by_name = f"SELECT image_name FROM images WHERE image_name='{sanitized_filename}'"
|
|
self.cursor.execute(sql_get_image_by_name)
|
|
rows = self.cursor.fetchall()
|
|
return True if len(rows) > 0 else False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from the db, cleaning up connections and cursors."""
|
|
if self.cursor is not None:
|
|
self.cursor.close()
|
|
if self.connection is not None:
|
|
self.connection.close()
|
|
|
|
|
|
class PhysicalFileMapper:
|
|
"""Containing class for script functionality."""
|
|
|
|
def __init__(self, outputs_path, thumbnails_path, archive_path, thumbnails_archive_path): # noqa D107
|
|
self.outputs_path = outputs_path
|
|
self.archive_path = archive_path
|
|
self.thumbnails_path = thumbnails_path
|
|
self.thumbnails_archive_path = thumbnails_archive_path
|
|
|
|
def create_archive_directories(self):
|
|
"""Create the directory for archiving orphaned image files."""
|
|
if not os.path.exists(self.archive_path):
|
|
print(f"Image archive directory ({self.archive_path}) does not exist -> creating...", end="")
|
|
os.makedirs(self.archive_path)
|
|
print("Created!")
|
|
if not os.path.exists(self.thumbnails_archive_path):
|
|
print(
|
|
f"Image thumbnails archive directory ({self.thumbnails_archive_path}) does not exist -> creating...",
|
|
end="",
|
|
)
|
|
os.makedirs(self.thumbnails_archive_path)
|
|
print("Created!")
|
|
|
|
def get_image_path_for_image_name(self, image_filename): # noqa D102
|
|
return os.path.join(self.outputs_path, image_filename)
|
|
|
|
def image_file_exists(self, image_filename): # noqa D102
|
|
return os.path.exists(self.get_image_path_for_image_name(image_filename))
|
|
|
|
def get_thumbnail_path_for_image(self, image_filename): # noqa D102
|
|
return os.path.join(self.thumbnails_path, os.path.splitext(image_filename)[0]) + ".webp"
|
|
|
|
def get_image_name_from_thumbnail_path(self, thumbnail_path): # noqa D102
|
|
return os.path.splitext(os.path.basename(thumbnail_path))[0] + ".png"
|
|
|
|
def thumbnail_exists_for_filename(self, image_filename): # noqa D102
|
|
return os.path.exists(self.get_thumbnail_path_for_image(image_filename))
|
|
|
|
def archive_image(self, image_filename): # noqa D102
|
|
if self.image_file_exists(image_filename):
|
|
image_path = self.get_image_path_for_image_name(image_filename)
|
|
shutil.move(image_path, self.archive_path)
|
|
|
|
def archive_thumbnail_by_image_filename(self, image_filename): # noqa D102
|
|
if self.thumbnail_exists_for_filename(image_filename):
|
|
thumbnail_path = self.get_thumbnail_path_for_image(image_filename)
|
|
shutil.move(thumbnail_path, self.thumbnails_archive_path)
|
|
|
|
def get_all_png_filenames_in_directory(self, directory_path): # noqa D102
|
|
filepaths = glob.glob(directory_path + "/*.png", recursive=False)
|
|
filenames = []
|
|
for filepath in filepaths:
|
|
filenames.append(os.path.basename(filepath))
|
|
return filenames
|
|
|
|
def get_all_thumbnails_with_full_path(self, thumbnails_directory): # noqa D102
|
|
return glob.glob(thumbnails_directory + "/*.webp", recursive=False)
|
|
|
|
def generate_thumbnail_for_image_name(self, image_filename): # noqa D102
|
|
# create thumbnail
|
|
file_path = self.get_image_path_for_image_name(image_filename)
|
|
thumb_path = self.get_thumbnail_path_for_image(image_filename)
|
|
thumb_size = 256, 256
|
|
with PIL.Image.open(file_path) as source_image:
|
|
source_image.thumbnail(thumb_size)
|
|
source_image.save(thumb_path, "webp")
|
|
|
|
|
|
class MaintenanceOperation(str, enum.Enum):
|
|
"""Enum class for operations."""
|
|
|
|
Ask = "ask"
|
|
CleanOrphanedDbEntries = "clean"
|
|
CleanOrphanedDiskFiles = "archive"
|
|
ReGenerateThumbnails = "thumbnails"
|
|
All = "all"
|
|
|
|
|
|
class InvokeAIDatabaseMaintenanceApp:
|
|
"""Main processor class for the application."""
|
|
|
|
_operation: MaintenanceOperation
|
|
_headless: bool = False
|
|
__stats: MaintenanceStats = MaintenanceStats()
|
|
|
|
def __init__(self, operation: MaintenanceOperation = MaintenanceOperation.Ask):
|
|
"""Initialize maintenance app."""
|
|
self._operation = MaintenanceOperation(operation)
|
|
self._headless = operation != MaintenanceOperation.Ask
|
|
|
|
def ask_for_operation(self) -> MaintenanceOperation:
|
|
"""Ask user to choose the operation to perform."""
|
|
while True:
|
|
print()
|
|
print("It is recommennded to run these operations as ordered below to avoid additional")
|
|
print("work being performed that will be discarded in a subsequent step.")
|
|
print()
|
|
print("Select maintenance operation:")
|
|
print()
|
|
print("1) Clean Orphaned Database Image Entries")
|
|
print(" Cleans entries in the database where the matching file was removed from")
|
|
print(" the outputs directory.")
|
|
print("2) Archive Orphaned Image Files")
|
|
print(" Files found in the outputs directory without an entry in the database are")
|
|
print(" moved to an archive directory.")
|
|
print("3) Re-Generate Missing Thumbnail Files")
|
|
print(" For files found in the outputs directory, re-generate a thumbnail if it")
|
|
print(" not found in the thumbnails directory.")
|
|
print()
|
|
print("(CTRL-C to quit)")
|
|
|
|
try:
|
|
input_option = int(input("Specify desired operation number (1-3): "))
|
|
|
|
operations = [
|
|
MaintenanceOperation.CleanOrphanedDbEntries,
|
|
MaintenanceOperation.CleanOrphanedDiskFiles,
|
|
MaintenanceOperation.ReGenerateThumbnails,
|
|
]
|
|
return operations[input_option - 1]
|
|
except (IndexError, ValueError):
|
|
print("\nInvalid selection!")
|
|
|
|
def ask_to_continue(self) -> bool:
|
|
"""Ask user whether they want to continue with the operation."""
|
|
while True:
|
|
input_choice = input("Do you wish to continue? (Y or N)? ")
|
|
if str.lower(input_choice) == "y":
|
|
return True
|
|
if str.lower(input_choice) == "n":
|
|
return False
|
|
|
|
def clean_orphaned_db_entries(
|
|
self, config: ConfigMapper, file_mapper: PhysicalFileMapper, db_mapper: DatabaseMapper
|
|
):
|
|
"""Clean dangling database entries that no longer point to a file in outputs."""
|
|
if self._headless:
|
|
print(f"Removing database references to images that no longer exist in {config.outputs_path}...")
|
|
else:
|
|
print()
|
|
print("===============================================================================")
|
|
print("= Clean Orphaned Database Entries")
|
|
print()
|
|
print("Perform this operation if you have removed files from the outputs/images")
|
|
print("directory but the database was never updated. You may see this as empty imaages")
|
|
print("in the app gallery, or images that only show an enlarged version of the")
|
|
print("thumbnail.")
|
|
print()
|
|
print(f"Database File Path : {config.database_path}")
|
|
print(f"Database backup will be taken at : {config.database_backup_dir}")
|
|
print(f"Outputs/Images Directory : {config.outputs_path}")
|
|
print(f"Outputs/Images Archive Directory : {config.archive_path}")
|
|
|
|
print("\nNotes about this operation:")
|
|
print("- This operation will find database image file entries that do not exist in the")
|
|
print(" outputs/images dir and remove those entries from the database.")
|
|
print("- This operation will target all image types including intermediate files.")
|
|
print("- If a thumbnail still exists in outputs/images/thumbnails matching the")
|
|
print(" orphaned entry, it will be moved to the archive directory.")
|
|
print()
|
|
|
|
if not self.ask_to_continue():
|
|
raise KeyboardInterrupt
|
|
|
|
file_mapper.create_archive_directories()
|
|
db_mapper.backup(config.TIMESTAMP_STRING)
|
|
db_mapper.connect()
|
|
db_files = db_mapper.get_all_image_files()
|
|
for db_file in db_files:
|
|
try:
|
|
if not file_mapper.image_file_exists(db_file):
|
|
print(f"Found orphaned image db entry {db_file}. Cleaning ...", end="")
|
|
db_mapper.remove_image_file_record(db_file)
|
|
print("Cleaned!")
|
|
if file_mapper.thumbnail_exists_for_filename(db_file):
|
|
print("A thumbnail was found, archiving ...", end="")
|
|
file_mapper.archive_thumbnail_by_image_filename(db_file)
|
|
print("Archived!")
|
|
self.__stats.count_orphaned_db_entries_cleaned += 1
|
|
except Exception as ex:
|
|
print("An error occurred cleaning db entry, error was:")
|
|
print(ex)
|
|
self.__stats.count_errors += 1
|
|
|
|
def clean_orphaned_disk_files(
|
|
self, config: ConfigMapper, file_mapper: PhysicalFileMapper, db_mapper: DatabaseMapper
|
|
):
|
|
"""Archive image files that no longer have entries in the database."""
|
|
if self._headless:
|
|
print(f"Archiving orphaned image files to {config.archive_path}...")
|
|
else:
|
|
print()
|
|
print("===============================================================================")
|
|
print("= Clean Orphaned Disk Files")
|
|
print()
|
|
print("Perform this operation if you have files that were copied into the outputs")
|
|
print("directory which are not referenced by the database. This can happen if you")
|
|
print("upgraded to a version with a fresh database, but re-used the outputs directory")
|
|
print("and now new images are mixed with the files not in the db. The script will")
|
|
print("archive these files so you can choose to delete them or re-import using the")
|
|
print("official import script.")
|
|
print()
|
|
print(f"Database File Path : {config.database_path}")
|
|
print(f"Database backup will be taken at : {config.database_backup_dir}")
|
|
print(f"Outputs/Images Directory : {config.outputs_path}")
|
|
print(f"Outputs/Images Archive Directory : {config.archive_path}")
|
|
|
|
print("\nNotes about this operation:")
|
|
print("- This operation will find image files not referenced by the database and move to an")
|
|
print(" archive directory.")
|
|
print("- This operation will target all image types including intermediate references.")
|
|
print("- The matching thumbnail will also be archived.")
|
|
print("- Any remaining orphaned thumbnails will also be archived.")
|
|
|
|
if not self.ask_to_continue():
|
|
raise KeyboardInterrupt
|
|
|
|
print()
|
|
|
|
file_mapper.create_archive_directories()
|
|
db_mapper.backup(config.TIMESTAMP_STRING)
|
|
db_mapper.connect()
|
|
phys_files = file_mapper.get_all_png_filenames_in_directory(config.outputs_path)
|
|
for phys_file in phys_files:
|
|
try:
|
|
if not db_mapper.does_image_exist(phys_file):
|
|
print(f"Found orphaned file {phys_file}, archiving...", end="")
|
|
file_mapper.archive_image(phys_file)
|
|
print("Archived!")
|
|
if file_mapper.thumbnail_exists_for_filename(phys_file):
|
|
print("Related thumbnail exists, archiving...", end="")
|
|
file_mapper.archive_thumbnail_by_image_filename(phys_file)
|
|
print("Archived!")
|
|
else:
|
|
print("No matching thumbnail existed to be cleaned.")
|
|
self.__stats.count_orphaned_disk_files_cleaned += 1
|
|
except Exception as ex:
|
|
print("Error found trying to archive file or thumbnail, error was:")
|
|
print(ex)
|
|
self.__stats.count_errors += 1
|
|
|
|
thumb_filepaths = file_mapper.get_all_thumbnails_with_full_path(config.thumbnails_path)
|
|
# archive any remaining orphaned thumbnails
|
|
for thumb_filepath in thumb_filepaths:
|
|
try:
|
|
thumb_src_image_name = file_mapper.get_image_name_from_thumbnail_path(thumb_filepath)
|
|
if not file_mapper.image_file_exists(thumb_src_image_name):
|
|
print(f"Found orphaned thumbnail {thumb_filepath}, archiving...", end="")
|
|
file_mapper.archive_thumbnail_by_image_filename(thumb_src_image_name)
|
|
print("Archived!")
|
|
self.__stats.count_orphaned_thumbnails_cleaned += 1
|
|
except Exception as ex:
|
|
print("Error found trying to archive thumbnail, error was:")
|
|
print(ex)
|
|
self.__stats.count_errors += 1
|
|
|
|
def regenerate_thumbnails(self, config: ConfigMapper, file_mapper: PhysicalFileMapper, *args):
|
|
"""Create missing thumbnails for any valid general images both in the db and on disk."""
|
|
if self._headless:
|
|
print("Regenerating missing image thumbnails...")
|
|
else:
|
|
print()
|
|
print("===============================================================================")
|
|
print("= Regenerate Thumbnails")
|
|
print()
|
|
print("This operation will find files that have no matching thumbnail on disk")
|
|
print("and regenerate those thumbnail files.")
|
|
print("NOTE: It is STRONGLY recommended that the user first clean/archive orphaned")
|
|
print(" disk files from the previous menu to avoid wasting time regenerating")
|
|
print(" thumbnails for orphaned files.")
|
|
|
|
print()
|
|
print(f"Outputs/Images Directory : {config.outputs_path}")
|
|
print(f"Outputs/Images Directory : {config.thumbnails_path}")
|
|
|
|
print("\nNotes about this operation:")
|
|
print("- This operation will find image files both referenced in the db and on disk")
|
|
print(" that do not have a matching thumbnail on disk and re-generate the thumbnail")
|
|
print(" file.")
|
|
|
|
if not self.ask_to_continue():
|
|
raise KeyboardInterrupt
|
|
|
|
print()
|
|
|
|
phys_files = file_mapper.get_all_png_filenames_in_directory(config.outputs_path)
|
|
for phys_file in phys_files:
|
|
try:
|
|
if not file_mapper.thumbnail_exists_for_filename(phys_file):
|
|
print(f"Found file without thumbnail {phys_file}...Regenerating Thumbnail...", end="")
|
|
file_mapper.generate_thumbnail_for_image_name(phys_file)
|
|
print("Done!")
|
|
self.__stats.count_thumbnails_regenerated += 1
|
|
except Exception as ex:
|
|
print("Error found trying to regenerate thumbnail, error was:")
|
|
print(ex)
|
|
self.__stats.count_errors += 1
|
|
|
|
def main(self): # noqa D107
|
|
print("\n===============================================================================")
|
|
print("Database and outputs Maintenance for Invoke AI 3.0.0 +")
|
|
print("===============================================================================\n")
|
|
|
|
config_mapper = ConfigMapper()
|
|
if not config_mapper.load():
|
|
print("\nInvalid configuration...exiting.\n")
|
|
return
|
|
|
|
file_mapper = PhysicalFileMapper(
|
|
config_mapper.outputs_path,
|
|
config_mapper.thumbnails_path,
|
|
config_mapper.archive_path,
|
|
config_mapper.thumbnails_archive_path,
|
|
)
|
|
db_mapper = DatabaseMapper(config_mapper.database_path, config_mapper.database_backup_dir)
|
|
|
|
op = self._operation
|
|
operations_to_perform = []
|
|
|
|
if op == MaintenanceOperation.Ask:
|
|
op = self.ask_for_operation()
|
|
|
|
if op in [MaintenanceOperation.CleanOrphanedDbEntries, MaintenanceOperation.All]:
|
|
operations_to_perform.append(self.clean_orphaned_db_entries)
|
|
if op in [MaintenanceOperation.CleanOrphanedDiskFiles, MaintenanceOperation.All]:
|
|
operations_to_perform.append(self.clean_orphaned_disk_files)
|
|
if op in [MaintenanceOperation.ReGenerateThumbnails, MaintenanceOperation.All]:
|
|
operations_to_perform.append(self.regenerate_thumbnails)
|
|
|
|
for operation in operations_to_perform:
|
|
operation(config_mapper, file_mapper, db_mapper)
|
|
|
|
print("\n===============================================================================")
|
|
print(f"= Maintenance Complete - Elapsed Time: {MaintenanceStats.get_elapsed_time_string()}")
|
|
print()
|
|
print(f"Orphaned db entries cleaned : {self.__stats.count_orphaned_db_entries_cleaned}")
|
|
print(f"Orphaned disk files archived : {self.__stats.count_orphaned_disk_files_cleaned}")
|
|
print(f"Orphaned thumbnail files archived : {self.__stats.count_orphaned_thumbnails_cleaned}")
|
|
print(f"Thumbnails regenerated : {self.__stats.count_thumbnails_regenerated}")
|
|
print(f"Errors during operation : {self.__stats.count_errors}")
|
|
|
|
print()
|
|
|
|
|
|
def main(): # noqa D107
|
|
parser = argparse.ArgumentParser(
|
|
description="InvokeAI image database maintenance utility",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""Operations:
|
|
ask Choose operation from a menu [default]
|
|
all Run all maintenance operations
|
|
clean Clean database of dangling entries
|
|
archive Archive orphaned image files
|
|
thumbnails Regenerate missing image thumbnails
|
|
""",
|
|
)
|
|
parser.add_argument("--root", default=".", type=Path, help="InvokeAI root directory")
|
|
parser.add_argument(
|
|
"--operation", default="ask", choices=[x.value for x in MaintenanceOperation], help="Operation to perform."
|
|
)
|
|
args = parser.parse_args()
|
|
try:
|
|
os.chdir(args.root)
|
|
app = InvokeAIDatabaseMaintenanceApp(args.operation)
|
|
app.main()
|
|
except KeyboardInterrupt:
|
|
print("\n\nUser cancelled execution.")
|
|
except FileNotFoundError:
|
|
print(f"Invalid root directory '{args.root}'.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|