InvokeAI/invokeai/frontend/install/import_images.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

787 lines
34 KiB
Python
Raw Normal View History

# Copyright (c) 2023 - The InvokeAI Team
2023-08-09 17:08:59 +00:00
# Primary Author: David Lovell (github @f412design, discord @techjedi)
# co-author, minor tweaks - Lincoln Stein
# pylint: disable=line-too-long
# pylint: disable=broad-exception-caught
"""Script to import images into the new database system for 3.0.0"""
import datetime
import glob
2023-08-18 14:57:18 +00:00
import json
import locale
import os
import re
2023-08-18 14:57:18 +00:00
import shutil
import sqlite3
import uuid
2023-08-18 14:57:18 +00:00
from pathlib import Path
import PIL
import PIL.ImageOps
import PIL.PngImagePlugin
2023-08-18 14:57:18 +00:00
import yaml
from prompt_toolkit import prompt
from prompt_toolkit.completion import PathCompleter
from prompt_toolkit.key_binding import KeyBindings
2023-08-18 14:57:18 +00:00
from prompt_toolkit.shortcuts import message_dialog
from invokeai.app.services.config import InvokeAIAppConfig
2023-08-05 16:44:58 +00:00
app_config = InvokeAIAppConfig.get_config()
bindings = KeyBindings()
@bindings.add("c-c")
def _(event):
raise KeyboardInterrupt
2023-08-05 16:44:58 +00:00
# release notes
# "Use All" with size dimensions not selectable in the UI will not load dimensions
2023-08-05 16:44:58 +00:00
class Config:
"""Configuration loader."""
2023-08-05 16:44:58 +00:00
def __init__(self):
pass
2023-08-05 16:44:58 +00:00
TIMESTAMP_STRING = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
INVOKE_DIRNAME = "invokeai"
YAML_FILENAME = "invokeai.yaml"
DATABASE_FILENAME = "invokeai.db"
database_path = None
database_backup_dir = None
outputs_path = None
thumbnail_path = None
def find_and_load(self):
"""Find the yaml config file and load"""
root = app_config.root_path
if not self.confirm_and_load(os.path.abspath(root)):
2023-08-05 16:44:58 +00:00
print("\r\nSpecify custom database and outputs paths:")
self.confirm_and_load_from_user()
2023-08-05 16:44:58 +00:00
self.database_backup_dir = os.path.join(os.path.dirname(self.database_path), "backup")
self.thumbnail_path = os.path.join(self.outputs_path, "thumbnails")
def confirm_and_load(self, invoke_root):
"""Validate a yaml path exists, confirms the user wants to use it and loads config."""
2023-08-05 16:44:58 +00:00
yaml_path = os.path.join(invoke_root, self.YAML_FILENAME)
if os.path.exists(yaml_path):
db_dir, outdir = self.load_paths_from_yaml(yaml_path)
if os.path.isabs(db_dir):
2023-08-05 16:44:58 +00:00
database_path = os.path.join(db_dir, self.DATABASE_FILENAME)
else:
2023-08-05 16:44:58 +00:00
database_path = os.path.join(invoke_root, db_dir, self.DATABASE_FILENAME)
if os.path.isabs(outdir):
outputs_path = os.path.join(outdir, "images")
else:
outputs_path = os.path.join(invoke_root, outdir, "images")
db_exists = os.path.exists(database_path)
outdir_exists = os.path.exists(outputs_path)
text = f"Found {self.YAML_FILENAME} file at {yaml_path}:"
text += f"\n Database : {database_path}"
text += f"\n Outputs : {outputs_path}"
text += "\n\nUse these paths for import (yes) or choose different ones (no) [Yn]: "
if db_exists and outdir_exists:
2023-08-05 16:44:58 +00:00
if (prompt(text).strip() or "Y").upper().startswith("Y"):
self.database_path = database_path
self.outputs_path = outputs_path
return True
else:
return False
else:
print(" Invalid: One or more paths in this config did not exist and cannot be used.")
else:
message_dialog(
title="Path not found",
2023-08-05 16:44:58 +00:00
text=f"Auto-discovery of configuration failed! Could not find ({yaml_path}), Custom paths can be specified.",
).run()
return False
def confirm_and_load_from_user(self):
2023-08-05 16:44:58 +00:00
default = ""
while True:
database_path = os.path.expanduser(
prompt(
"Database: Specify absolute path to the database to import into: ",
2023-08-05 16:44:58 +00:00
completer=PathCompleter(
expanduser=True, file_filter=lambda x: Path(x).is_dir() or x.endswith((".db"))
),
default=default,
)
)
if database_path.endswith(".db") and os.path.isabs(database_path) and os.path.exists(database_path):
break
default = database_path + "/" if Path(database_path).is_dir() else database_path
2023-08-05 16:44:58 +00:00
default = ""
while True:
outputs_path = os.path.expanduser(
prompt(
"Outputs: Specify absolute path to outputs/images directory to import into: ",
2023-08-05 16:44:58 +00:00
completer=PathCompleter(expanduser=True, only_directories=True),
default=default,
)
)
if outputs_path.endswith("images") and os.path.isabs(outputs_path) and os.path.exists(outputs_path):
break
default = outputs_path + "/" if Path(outputs_path).is_dir() else outputs_path
2023-08-05 16:44:58 +00:00
self.database_path = database_path
self.outputs_path = outputs_path
return
def load_paths_from_yaml(self, yaml_path):
"""Load an Invoke AI yaml file and get the database and outputs paths."""
try:
2023-08-05 16:44:58 +00:00
with open(yaml_path, "rt", encoding=locale.getpreferredencoding()) as file:
yamlinfo = yaml.safe_load(file)
2023-08-05 16:44:58 +00:00
db_dir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("db_dir", None)
outdir = yamlinfo.get("InvokeAI", {}).get("Paths", {}).get("outdir", None)
return db_dir, outdir
except Exception:
print(f"Failed to load paths from yaml file! {yaml_path}!")
return None, None
2023-08-05 16:44:58 +00:00
class ImportStats:
"""DTO for tracking work progress."""
2023-08-05 16:44:58 +00:00
def __init__(self):
pass
time_start = datetime.datetime.utcnow()
count_source_files = 0
count_skipped_file_exists = 0
count_skipped_db_exists = 0
count_imported = 0
count_imported_by_version = {}
count_file_errors = 0
@staticmethod
def get_elapsed_time_string():
"""Get a friendly time string for the time elapsed since processing start."""
time_now = datetime.datetime.utcnow()
total_seconds = (time_now - ImportStats.time_start).total_seconds()
hours = int((total_seconds) / 3600)
minutes = int(((total_seconds) % 3600) / 60)
seconds = total_seconds % 60
out_str = f"{hours} hour(s) -" if hours > 0 else ""
out_str += f"{minutes} minute(s) -" if minutes > 0 else ""
out_str += f"{seconds:.2f} second(s)"
return out_str
2023-08-05 16:44:58 +00:00
class InvokeAIMetadata:
"""DTO for core Invoke AI generation properties parsed from metadata."""
2023-08-05 16:44:58 +00:00
def __init__(self):
pass
def __str__(self):
formatted_str = f"{self.generation_mode}~{self.steps}~{self.cfg_scale}~{self.model_name}~{self.scheduler}~{self.seed}~{self.width}~{self.height}~{self.rand_device}~{self.strength}~{self.init_image}"
formatted_str += f"\r\npositive_prompt: {self.positive_prompt}"
formatted_str += f"\r\nnegative_prompt: {self.negative_prompt}"
return formatted_str
generation_mode = None
steps = None
cfg_scale = None
model_name = None
scheduler = None
seed = None
width = None
height = None
rand_device = None
strength = None
init_image = None
positive_prompt = None
negative_prompt = None
imported_app_version = None
def to_json(self):
"""Convert the active instance to json format."""
prop_dict = {}
prop_dict["generation_mode"] = self.generation_mode
# dont render prompt nodes if neither are set to avoid the ui thinking it can set them
# if at least one exists, render them both, but use empty string instead of None if one of them is empty
# this allows the field that is empty to actually be cleared byt he UI instead of leaving the previous value
if self.positive_prompt or self.negative_prompt:
prop_dict["positive_prompt"] = "" if self.positive_prompt is None else self.positive_prompt
prop_dict["negative_prompt"] = "" if self.negative_prompt is None else self.negative_prompt
prop_dict["width"] = self.width
prop_dict["height"] = self.height
# only render seed if it has a value to avoid ui thinking it can set this and then error
if self.seed:
prop_dict["seed"] = self.seed
prop_dict["rand_device"] = self.rand_device
prop_dict["cfg_scale"] = self.cfg_scale
prop_dict["steps"] = self.steps
prop_dict["scheduler"] = self.scheduler
prop_dict["clip_skip"] = 0
prop_dict["model"] = {}
prop_dict["model"]["model_name"] = self.model_name
prop_dict["model"]["base_model"] = None
prop_dict["controlnets"] = []
prop_dict["loras"] = []
prop_dict["vae"] = None
prop_dict["strength"] = self.strength
prop_dict["init_image"] = self.init_image
prop_dict["positive_style_prompt"] = None
prop_dict["negative_style_prompt"] = None
prop_dict["refiner_model"] = None
prop_dict["refiner_cfg_scale"] = None
prop_dict["refiner_steps"] = None
prop_dict["refiner_scheduler"] = None
prop_dict["refiner_aesthetic_store"] = None
prop_dict["refiner_start"] = None
prop_dict["imported_app_version"] = self.imported_app_version
return json.dumps(prop_dict)
class InvokeAIMetadataParser:
"""Parses strings with json data to find Invoke AI core metadata properties."""
2023-08-05 16:44:58 +00:00
def __init__(self):
pass
def parse_meta_tag_dream(self, dream_string):
"""Take as input an png metadata json node for the 'dream' field variant from prior to 1.15"""
props = InvokeAIMetadata()
props.imported_app_version = "pre1.15"
seed_match = re.search("-S\\s*(\\d+)", dream_string)
if seed_match is not None:
try:
props.seed = int(seed_match[1])
except ValueError:
props.seed = None
raw_prompt = re.sub("(-S\\s*\\d+)", "", dream_string)
else:
raw_prompt = dream_string
pos_prompt, neg_prompt = self.split_prompt(raw_prompt)
props.positive_prompt = pos_prompt
props.negative_prompt = neg_prompt
return props
def parse_meta_tag_sd_metadata(self, tag_value):
"""Take as input an png metadata json node for the 'sd-metadata' field variant from 1.15 through 2.3.5 post 2"""
props = InvokeAIMetadata()
props.imported_app_version = tag_value.get("app_version")
props.model_name = tag_value.get("model_weights")
img_node = tag_value.get("image")
if img_node is not None:
props.generation_mode = img_node.get("type")
props.width = img_node.get("width")
props.height = img_node.get("height")
props.seed = img_node.get("seed")
2023-08-05 16:44:58 +00:00
props.rand_device = "cuda" # hardcoded since all generations pre 3.0 used cuda random noise instead of cpu
props.cfg_scale = img_node.get("cfg_scale")
props.steps = img_node.get("steps")
props.scheduler = self.map_scheduler(img_node.get("sampler"))
props.strength = img_node.get("strength")
if props.strength is None:
2023-08-05 16:44:58 +00:00
props.strength = img_node.get("strength_steps") # try second name for this property
props.init_image = img_node.get("init_image_path")
2023-08-05 16:44:58 +00:00
if props.init_image is None: # try second name for this property
props.init_image = img_node.get("init_img")
# remove the path info from init_image so if we move the init image, it will be correctly relative in the new location
if props.init_image is not None:
props.init_image = os.path.basename(props.init_image)
raw_prompt = img_node.get("prompt")
if isinstance(raw_prompt, list):
raw_prompt = raw_prompt[0].get("prompt")
props.positive_prompt, props.negative_prompt = self.split_prompt(raw_prompt)
return props
def parse_meta_tag_invokeai(self, tag_value):
"""Take as input an png metadata json node for the 'invokeai' field variant from 3.0.0 beta 1 through 5"""
props = InvokeAIMetadata()
props.imported_app_version = "3.0.0 or later"
props.generation_mode = tag_value.get("type")
if props.generation_mode is not None:
2023-08-05 16:44:58 +00:00
props.generation_mode = props.generation_mode.replace("t2l", "txt2img").replace("l2l", "img2img")
props.width = tag_value.get("width")
props.height = tag_value.get("height")
props.seed = tag_value.get("seed")
props.cfg_scale = tag_value.get("cfg_scale")
props.steps = tag_value.get("steps")
props.scheduler = tag_value.get("scheduler")
props.strength = tag_value.get("strength")
props.positive_prompt = tag_value.get("positive_conditioning")
props.negative_prompt = tag_value.get("negative_conditioning")
return props
def map_scheduler(self, old_scheduler):
"""Convert the legacy sampler names to matching 3.0 schedulers"""
# this was more elegant as a case statement, but that's not available in python 3.9
if old_scheduler is None:
return None
scheduler_map = dict(
ddim="ddim",
plms="pnmd",
k_lms="lms",
k_dpm_2="kdpm_2",
k_dpm_2_a="kdpm_2_a",
dpmpp_2="dpmpp_2s",
k_dpmpp_2="dpmpp_2m",
k_dpmpp_2_a=None, # invalid, in 2.3.x, selecting this sample would just fallback to last run or plms if new session
k_euler="euler",
k_euler_a="euler_a",
k_heun="heun",
)
return scheduler_map.get(old_scheduler)
def split_prompt(self, raw_prompt: str):
"""Split the unified prompt strings by extracting all negative prompt blocks out into the negative prompt."""
if raw_prompt is None:
return "", ""
2023-08-05 16:44:58 +00:00
raw_prompt_search = raw_prompt.replace("\r", "").replace("\n", "")
matches = re.findall(r"\[(.+?)\]", raw_prompt_search)
if len(matches) > 0:
negative_prompt = ""
if len(matches) == 1:
2023-08-05 16:44:58 +00:00
negative_prompt = matches[0].strip().strip(",")
else:
for match in matches:
negative_prompt += f"({match.strip().strip(',')})"
positive_prompt = re.sub(r"(\[.+?\])", "", raw_prompt_search).strip()
else:
positive_prompt = raw_prompt_search.strip()
negative_prompt = ""
2023-08-05 16:44:58 +00:00
return positive_prompt, negative_prompt
class DatabaseMapper:
"""Class to abstract database functionality."""
2023-08-05 16:44:58 +00:00
def __init__(self, database_path, database_backup_dir):
self.database_path = database_path
self.database_backup_dir = database_backup_dir
self.connection = None
self.cursor = None
def connect(self):
"""Open connection to the database."""
self.connection = sqlite3.connect(self.database_path)
self.cursor = self.connection.cursor()
def get_board_names(self):
"""Get a list of the current board names from the database."""
sql_get_board_name = "SELECT board_name FROM boards"
self.cursor.execute(sql_get_board_name)
rows = self.cursor.fetchall()
return [row[0] for row in rows]
def does_image_exist(self, image_name):
"""Check database if a image name already exists and return a boolean."""
sql_get_image_by_name = f"SELECT image_name FROM images WHERE image_name='{image_name}'"
self.cursor.execute(sql_get_image_by_name)
rows = self.cursor.fetchall()
return True if len(rows) > 0 else False
def add_new_image_to_database(self, filename, width, height, metadata, modified_date_string):
2023-08-05 16:44:58 +00:00
"""Add an image to the database."""
sql_add_image = f"""INSERT INTO images (image_name, image_origin, image_category, width, height, session_id, node_id, metadata, is_intermediate, created_at, updated_at)
VALUES ('{filename}', 'internal', 'general', {width}, {height}, null, null, '{metadata}', 0, '{modified_date_string}', '{modified_date_string}')"""
self.cursor.execute(sql_add_image)
self.connection.commit()
def get_board_id_with_create(self, board_name):
"""Get the board id for supplied name, and create the board if one does not exist."""
sql_find_board = f"SELECT board_id FROM boards WHERE board_name='{board_name}' COLLATE NOCASE"
self.cursor.execute(sql_find_board)
rows = self.cursor.fetchall()
2023-08-05 16:44:58 +00:00
if len(rows) > 0:
return rows[0][0]
else:
board_date_string = datetime.datetime.utcnow().date().isoformat()
new_board_id = str(uuid.uuid4())
sql_insert_board = f"INSERT INTO boards (board_id, board_name, created_at, updated_at) VALUES ('{new_board_id}', '{board_name}', '{board_date_string}', '{board_date_string}')"
self.cursor.execute(sql_insert_board)
self.connection.commit()
return new_board_id
def add_image_to_board(self, filename, board_id):
"""Add an image mapping to a board."""
add_datetime_str = datetime.datetime.utcnow().isoformat()
sql_add_image_to_board = f"""INSERT INTO board_images (board_id, image_name, created_at, updated_at)
VALUES ('{board_id}', '{filename}', '{add_datetime_str}', '{add_datetime_str}')"""
self.cursor.execute(sql_add_image_to_board)
self.connection.commit()
def disconnect(self):
"""Disconnect from the db, cleaning up connections and cursors."""
if self.cursor is not None:
self.cursor.close()
if self.connection is not None:
self.connection.close()
def backup(self, timestamp_string):
2023-08-05 16:44:58 +00:00
"""Take a backup of the database."""
if not os.path.exists(self.database_backup_dir):
2023-08-05 16:44:58 +00:00
print(f"Database backup directory {self.database_backup_dir} does not exist -> creating...", end="")
os.makedirs(self.database_backup_dir)
2023-08-05 16:44:58 +00:00
print("Done!")
database_backup_path = os.path.join(self.database_backup_dir, f"backup-{timestamp_string}-invokeai.db")
2023-08-05 16:44:58 +00:00
print(f"Making DB Backup at {database_backup_path}...", end="")
shutil.copy2(self.database_path, database_backup_path)
2023-08-05 16:44:58 +00:00
print("Done!")
class MediaImportProcessor:
"""Containing class for script functionality."""
2023-08-05 16:44:58 +00:00
def __init__(self):
pass
board_name_id_map = {}
def get_import_file_list(self):
"""Ask the user for the import folder and scan for the list of files to return."""
while True:
2023-08-05 16:44:58 +00:00
default = ""
while True:
import_dir = os.path.expanduser(
prompt(
"Inputs: Specify absolute path containing InvokeAI .png images to import: ",
2023-08-05 16:44:58 +00:00
completer=PathCompleter(expanduser=True, only_directories=True),
default=default,
)
)
if len(import_dir) > 0 and Path(import_dir).is_dir():
break
default = import_dir
2023-08-05 16:44:58 +00:00
recurse_directories = (
(prompt("Include files from subfolders recursively [yN]? ").strip() or "N").upper().startswith("N")
)
if recurse_directories:
is_recurse = False
2023-08-05 16:44:58 +00:00
matching_file_list = glob.glob(import_dir + "/*.png", recursive=False)
else:
is_recurse = True
2023-08-05 16:44:58 +00:00
matching_file_list = glob.glob(import_dir + "/**/*.png", recursive=True)
if len(matching_file_list) > 0:
return import_dir, is_recurse, matching_file_list
else:
print(f"The specific path {import_dir} exists, but does not contain .png files!")
def get_file_details(self, filepath):
"""Retrieve the embedded metedata fields and dimensions from an image file."""
with PIL.Image.open(filepath) as img:
img.load()
png_width, png_height = img.size
img_info = img.info
return img_info, png_width, png_height
def select_board_option(self, board_names, timestamp_string):
"""Allow the user to choose how a board is selected for imported files."""
while True:
print("\r\nOptions for board selection for imported images:")
print(f"1) Select an existing board name. (found {len(board_names)})")
2023-08-05 16:44:58 +00:00
print("2) Specify a board name to create/add to.")
print("3) Create/add to board named 'IMPORT'.")
print(
f"4) Create/add to board named 'IMPORT' with the current datetime string appended (.e.g IMPORT_{timestamp_string})."
)
print(
"5) Create/add to board named 'IMPORT' with a the original file app_version appended (.e.g IMPORT_2.2.5)."
)
input_option = input("Specify desired board option: ")
# This was more elegant as a case statement, but not supported in python 3.9
if input_option == "1":
if len(board_names) < 1:
print("\r\nThere are no existing board names to choose from. Select another option!")
continue
board_name = self.select_item_from_list(
board_names, "board name", True, "Cancel, go back and choose a different board option."
)
if board_name is not None:
return board_name
elif input_option == "2":
while True:
board_name = input("Specify new/existing board name: ")
if board_name:
return board_name
elif input_option == "3":
return "IMPORT"
elif input_option == "4":
return f"IMPORT_{timestamp_string}"
elif input_option == "5":
return "IMPORT_APPVERSION"
def select_item_from_list(self, items, entity_name, allow_cancel, cancel_string):
"""A general function to render a list of items to select in the console, prompt the user for a selection and ensure a valid entry is selected."""
2023-08-05 16:44:58 +00:00
print(f"Select a {entity_name.lower()} from the following list:")
index = 1
for item in items:
print(f"{index}) {item}")
index += 1
if allow_cancel:
print(f"{index}) {cancel_string}")
while True:
try:
option_number = int(input("Specify number of selection: "))
except ValueError:
continue
if allow_cancel and option_number == index:
return None
2023-08-05 16:44:58 +00:00
if option_number >= 1 and option_number <= len(items):
return items[option_number - 1]
def import_image(self, filepath: str, board_name_option: str, db_mapper: DatabaseMapper, config: Config):
"""Import a single file by its path"""
parser = InvokeAIMetadataParser()
file_name = os.path.basename(filepath)
file_destination_path = os.path.join(config.outputs_path, file_name)
print("===============================================================================")
print(f"Importing {filepath}")
# check destination to see if the file was previously imported
if os.path.exists(file_destination_path):
print("File already exists in the destination, skipping!")
ImportStats.count_skipped_file_exists += 1
return
# check if file name is already referenced in the database
if db_mapper.does_image_exist(file_name):
print("A reference to a file with this name already exists in the database, skipping!")
ImportStats.count_skipped_db_exists += 1
return
# load image info and dimensions
img_info, png_width, png_height = self.get_file_details(filepath)
# parse metadata
destination_needs_meta_update = True
log_version_note = "(Unknown)"
if "invokeai_metadata" in img_info:
# for the latest, we will just re-emit the same json, no need to parse/modify
converted_field = None
latest_json_string = img_info.get("invokeai_metadata")
log_version_note = "3.0.0+"
destination_needs_meta_update = False
else:
if "sd-metadata" in img_info:
converted_field = parser.parse_meta_tag_sd_metadata(json.loads(img_info.get("sd-metadata")))
elif "invokeai" in img_info:
converted_field = parser.parse_meta_tag_invokeai(json.loads(img_info.get("invokeai")))
elif "dream" in img_info:
converted_field = parser.parse_meta_tag_dream(img_info.get("dream"))
elif "Dream" in img_info:
converted_field = parser.parse_meta_tag_dream(img_info.get("Dream"))
else:
converted_field = InvokeAIMetadata()
destination_needs_meta_update = False
print("File does not have metadata from known Invoke AI versions, add only, no update!")
# use the loaded img dimensions if the metadata didnt have them
if converted_field.width is None:
converted_field.width = png_width
if converted_field.height is None:
converted_field.height = png_height
log_version_note = converted_field.imported_app_version if converted_field else "NoVersion"
log_version_note = log_version_note or "NoVersion"
latest_json_string = converted_field.to_json()
2023-08-05 16:44:58 +00:00
print(f"From Invoke AI Version {log_version_note} with dimensions {png_width} x {png_height}.")
# if metadata needs update, then update metdata and copy in one shot
if destination_needs_meta_update:
print("Updating metadata while copying...", end="")
2023-08-05 16:44:58 +00:00
self.update_file_metadata_while_copying(
filepath, file_destination_path, "invokeai_metadata", latest_json_string
)
print("Done!")
else:
print("No metadata update necessary, copying only...", end="")
shutil.copy2(filepath, file_destination_path)
print("Done!")
# create thumbnail
print("Creating thumbnail...", end="")
2023-08-05 16:44:58 +00:00
thumbnail_path = os.path.join(config.thumbnail_path, os.path.splitext(file_name)[0]) + ".webp"
thumbnail_size = 256, 256
with PIL.Image.open(filepath) as source_image:
source_image.thumbnail(thumbnail_size)
source_image.save(thumbnail_path, "webp")
print("Done!")
# finalize the dynamic board name if there is an APPVERSION token in it.
if converted_field is not None:
board_name = board_name_option.replace("APPVERSION", converted_field.imported_app_version or "NoVersion")
else:
board_name = board_name_option.replace("APPVERSION", "Latest")
# maintain a map of alrady created/looked up ids to avoid DB queries
print("Finding/Creating board...", end="")
if board_name in self.board_name_id_map:
board_id = self.board_name_id_map[board_name]
else:
board_id = db_mapper.get_board_id_with_create(board_name)
self.board_name_id_map[board_name] = board_id
print("Done!")
# add image to db
print("Adding image to database......", end="")
modified_time = datetime.datetime.utcfromtimestamp(os.path.getmtime(filepath))
db_mapper.add_new_image_to_database(file_name, png_width, png_height, latest_json_string, modified_time)
print("Done!")
2023-08-05 16:44:58 +00:00
# add image to board
print("Adding image to board......", end="")
db_mapper.add_image_to_board(file_name, board_id)
print("Done!")
ImportStats.count_imported += 1
if log_version_note in ImportStats.count_imported_by_version:
ImportStats.count_imported_by_version[log_version_note] += 1
else:
ImportStats.count_imported_by_version[log_version_note] = 1
def update_file_metadata_while_copying(self, filepath, file_destination_path, tag_name, tag_value):
"""Perform a metadata update with save to a new destination which accomplishes a copy while updating metadata."""
with PIL.Image.open(filepath) as target_image:
existing_img_info = target_image.info
metadata = PIL.PngImagePlugin.PngInfo()
# re-add any existing invoke ai tags unless they are the one we are trying to add
for key in existing_img_info:
if key != tag_name and key in ("dream", "Dream", "sd-metadata", "invokeai", "invokeai_metadata"):
2023-08-05 16:44:58 +00:00
metadata.add_text(key, existing_img_info[key])
metadata.add_text(tag_name, tag_value)
target_image.save(file_destination_path, pnginfo=metadata)
def process(self):
"""Begin main processing."""
print("===============================================================================")
print("This script will import images generated by earlier versions of")
print("InvokeAI into the currently installed root directory:")
2023-08-05 16:44:58 +00:00
print(f" {app_config.root_path}")
print("If this is not what you want to do, type ctrl-C now to cancel.")
# load config
print("===============================================================================")
print("= Configuration & Settings")
config = Config()
config.find_and_load()
db_mapper = DatabaseMapper(config.database_path, config.database_backup_dir)
db_mapper.connect()
import_dir, is_recurse, import_file_list = self.get_import_file_list()
ImportStats.count_source_files = len(import_file_list)
board_names = db_mapper.get_board_names()
board_name_option = self.select_board_option(board_names, config.TIMESTAMP_STRING)
print("\r\n===============================================================================")
print("= Import Settings Confirmation")
print()
print(f"Database File Path : {config.database_path}")
print(f"Outputs/Images Directory : {config.outputs_path}")
print(f"Import Image Source Directory : {import_dir}")
print(f" Recurse Source SubDirectories : {'Yes' if is_recurse else 'No'}")
print(f"Count of .png file(s) found : {len(import_file_list)}")
print(f"Board name option specified : {board_name_option}")
print(f"Database backup will be taken at : {config.database_backup_dir}")
print("\r\nNotes about the import process:")
print("- Source image files will not be modified, only copied to the outputs directory.")
print("- If the same file name already exists in the destination, the file will be skipped.")
print("- If the same file name already has a record in the database, the file will be skipped.")
print("- Invoke AI metadata tags will be updated/written into the imported copy only.")
2023-08-05 16:44:58 +00:00
print(
"- On the imported copy, only Invoke AI known tags (latest and legacy) will be retained (dream, sd-metadata, invokeai, invokeai_metadata)"
)
print(
"- A property 'imported_app_version' will be added to metadata that can be viewed in the UI's metadata viewer."
)
print(
"- The new 3.x InvokeAI outputs folder structure is flat so recursively found source imges will all be placed into the single outputs/images folder."
)
while True:
2023-08-05 16:44:58 +00:00
should_continue = prompt("\nDo you wish to continue with the import [Yn] ? ").lower() or "y"
if should_continue == "n":
print("\r\nCancelling Import")
return
2023-08-05 16:44:58 +00:00
elif should_continue == "y":
print()
break
db_mapper.backup(config.TIMESTAMP_STRING)
print()
ImportStats.time_start = datetime.datetime.utcnow()
for filepath in import_file_list:
try:
self.import_image(filepath, board_name_option, db_mapper, config)
except sqlite3.Error as sql_ex:
print(f"A database related exception was found processing {filepath}, will continue to next file. ")
print("Exception detail:")
print(sql_ex)
ImportStats.count_file_errors += 1
except Exception as ex:
print(f"Exception processing {filepath}, will continue to next file. ")
print("Exception detail:")
print(ex)
ImportStats.count_file_errors += 1
print("\r\n===============================================================================")
print(f"= Import Complete - Elpased Time: {ImportStats.get_elapsed_time_string()}")
print()
print(f"Source File(s) : {ImportStats.count_source_files}")
print(f"Total Imported : {ImportStats.count_imported}")
print(f"Skipped b/c file already exists on disk : {ImportStats.count_skipped_file_exists}")
print(f"Skipped b/c file already exists in db : {ImportStats.count_skipped_db_exists}")
print(f"Errors during import : {ImportStats.count_file_errors}")
if ImportStats.count_imported > 0:
print("\r\nBreakdown of imported files by version:")
2023-08-05 16:44:58 +00:00
for key, version in ImportStats.count_imported_by_version.items():
print(f" {key:20} : {version}")
2023-08-05 16:44:58 +00:00
def main():
try:
processor = MediaImportProcessor()
processor.process()
except KeyboardInterrupt:
print("\r\n\r\nUser cancelled execution.")
2023-08-05 16:44:58 +00:00
if __name__ == "__main__":
main()