mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
refactoring complete; please test carefully!
This commit is contained in:
251
scripts/dream.py
251
scripts/dream.py
@ -3,18 +3,10 @@
|
||||
|
||||
import argparse
|
||||
import shlex
|
||||
import atexit
|
||||
import os
|
||||
import sys
|
||||
import copy
|
||||
from PIL import Image,PngImagePlugin
|
||||
|
||||
# readline unavailable on windows systems
|
||||
try:
|
||||
import readline
|
||||
readline_available = True
|
||||
except:
|
||||
readline_available = False
|
||||
from ldm.dream_util import Completer,PngWriter,PromptFormatter
|
||||
|
||||
debugging = False
|
||||
|
||||
@ -35,10 +27,6 @@ def main():
|
||||
config = "configs/stable-diffusion/v1-inference.yaml"
|
||||
weights = "models/ldm/stable-diffusion-v1/model.ckpt"
|
||||
|
||||
# command line history will be stored in a file called "~/.dream_history"
|
||||
if readline_available:
|
||||
setup_readline()
|
||||
|
||||
print("* Initializing, be patient...\n")
|
||||
sys.path.append('.')
|
||||
from pytorch_lightning import logging
|
||||
@ -54,8 +42,6 @@ def main():
|
||||
# the user input loop
|
||||
t2i = T2I(width=width,
|
||||
height=height,
|
||||
batch_size=opt.batch_size,
|
||||
outdir=opt.outdir,
|
||||
sampler_name=opt.sampler_name,
|
||||
weights=weights,
|
||||
full_precision=opt.full_precision,
|
||||
@ -87,13 +73,13 @@ def main():
|
||||
log_path = os.path.join(opt.outdir,'dream_log.txt')
|
||||
with open(log_path,'a') as log:
|
||||
cmd_parser = create_cmd_parser()
|
||||
main_loop(t2i,cmd_parser,log,infile)
|
||||
main_loop(t2i,opt.outdir,cmd_parser,log,infile)
|
||||
log.close()
|
||||
if infile:
|
||||
infile.close()
|
||||
|
||||
|
||||
def main_loop(t2i,parser,log,infile):
|
||||
def main_loop(t2i,outdir,parser,log,infile):
|
||||
''' prompt/read/execute loop '''
|
||||
done = False
|
||||
|
||||
@ -131,13 +117,13 @@ def main_loop(t2i,parser,log,infile):
|
||||
if elements[0]=='cd' and len(elements)>1:
|
||||
if os.path.exists(elements[1]):
|
||||
print(f"setting image output directory to {elements[1]}")
|
||||
t2i.outdir=elements[1]
|
||||
outdir=elements[1]
|
||||
else:
|
||||
print(f"directory {elements[1]} does not exist")
|
||||
continue
|
||||
|
||||
if elements[0]=='pwd':
|
||||
print(f"current output directory is {t2i.outdir}")
|
||||
print(f"current output directory is {outdir}")
|
||||
continue
|
||||
|
||||
if elements[0].startswith('!dream'): # in case a stored prompt still contains the !dream command
|
||||
@ -166,117 +152,77 @@ def main_loop(t2i,parser,log,infile):
|
||||
print("Try again with a prompt!")
|
||||
continue
|
||||
|
||||
normalized_prompt = PromptFormatter(t2i,opt).normalize_prompt()
|
||||
individual_images = not opt.grid
|
||||
|
||||
try:
|
||||
if opt.init_img is None:
|
||||
results = t2i.txt2img(**vars(opt))
|
||||
else:
|
||||
assert os.path.exists(opt.init_img),f"No file found at {opt.init_img}. On Linux systems, pressing <tab> after -I will autocomplete a list of possible image files."
|
||||
if None not in (opt.width,opt.height):
|
||||
print('Warning: width and height options are ignored when modifying an init image')
|
||||
results = t2i.img2img(**vars(opt))
|
||||
file_writer = PngWriter(outdir,normalized_prompt,opt.batch_size)
|
||||
callback = file_writer.write_image if individual_images else None
|
||||
|
||||
image_list = t2i.prompt2image(image_callback=callback,**vars(opt))
|
||||
results = file_writer.files_written if individual_images else image_list
|
||||
|
||||
if opt.grid and len(results) > 0:
|
||||
grid_img = file_writer.make_grid([r[0] for r in results])
|
||||
filename = file_writer.unique_filename(results[0][1])
|
||||
seeds = [a[1] for a in results]
|
||||
results = [[filename,seeds]]
|
||||
metadata_prompt = f'{normalized_prompt} -S{results[0][1]}'
|
||||
file_writer.save_image_and_prompt_to_png(grid_img,metadata_prompt,filename)
|
||||
|
||||
except AssertionError as e:
|
||||
print(e)
|
||||
continue
|
||||
|
||||
|
||||
allVariantResults = []
|
||||
if opt.variants is not None:
|
||||
print(f"Generating {opt.variants} variant(s)...")
|
||||
newopt = copy.deepcopy(opt)
|
||||
newopt.iterations = 1
|
||||
newopt.variants = None
|
||||
for r in results:
|
||||
newopt.init_img = r[0]
|
||||
print(f"\t generating variant for {newopt.init_img}")
|
||||
for j in range(0, opt.variants):
|
||||
try:
|
||||
variantResults = t2i.img2img(**vars(newopt))
|
||||
allVariantResults.append([newopt,variantResults])
|
||||
except AssertionError as e:
|
||||
print(e)
|
||||
continue
|
||||
print(f"{opt.variants} Variants generated!")
|
||||
except OSError as e:
|
||||
print(e)
|
||||
continue
|
||||
|
||||
print("Outputs:")
|
||||
write_log_message(t2i,opt,results,log)
|
||||
|
||||
if allVariantResults:
|
||||
print("Variant outputs:")
|
||||
for vr in allVariantResults:
|
||||
write_log_message(t2i,vr[0],vr[1],log)
|
||||
|
||||
write_log_message(t2i,normalized_prompt,results,log)
|
||||
|
||||
print("goodbye!")
|
||||
|
||||
# variant generation is going to be superseded by a generalized
|
||||
# "prompt-morph" functionality
|
||||
# def generate_variants(t2i,outdir,opt,previous_gens):
|
||||
# variants = []
|
||||
# print(f"Generating {opt.variants} variant(s)...")
|
||||
# newopt = copy.deepcopy(opt)
|
||||
# newopt.iterations = 1
|
||||
# newopt.variants = None
|
||||
# for r in previous_gens:
|
||||
# newopt.init_img = r[0]
|
||||
# prompt = PromptFormatter(t2i,newopt).normalize_prompt()
|
||||
# print(f"] generating variant for {newopt.init_img}")
|
||||
# for j in range(0,opt.variants):
|
||||
# try:
|
||||
# file_writer = PngWriter(outdir,prompt,newopt.batch_size)
|
||||
# callback = file_writer.write_image
|
||||
# t2i.prompt2image(image_callback=callback,**vars(newopt))
|
||||
# results = file_writer.files_written
|
||||
# variants.append([prompt,results])
|
||||
# except AssertionError as e:
|
||||
# print(e)
|
||||
# continue
|
||||
# print(f'{opt.variants} variants generated')
|
||||
# return variants
|
||||
|
||||
|
||||
def write_log_message(t2i,opt,results,logfile):
|
||||
''' logs the name of the output image, its prompt and seed to the terminal, log file, and a Dream text chunk in the PNG metadata '''
|
||||
switches = _reconstruct_switches(t2i,opt)
|
||||
prompt_str = ' '.join(switches)
|
||||
|
||||
# when multiple images are produced in batch, then we keep track of where each starts
|
||||
def write_log_message(t2i,prompt,results,logfile):
|
||||
''' logs the name of the output image, its prompt and seed to the terminal, log file, and a Dream text chunk in the PNG metadata'''
|
||||
last_seed = None
|
||||
img_num = 1
|
||||
batch_size = opt.batch_size or t2i.batch_size
|
||||
seenit = {}
|
||||
|
||||
seeds = [a[1] for a in results]
|
||||
if batch_size > 1:
|
||||
seeds = f"(seeds for each batch row: {seeds})"
|
||||
else:
|
||||
seeds = f"(seeds for individual images: {seeds})"
|
||||
|
||||
for r in results:
|
||||
seed = r[1]
|
||||
log_message = (f'{r[0]}: {prompt_str} -S{seed}')
|
||||
log_message = (f'{r[0]}: {prompt} -S{seed}')
|
||||
|
||||
if batch_size > 1:
|
||||
if seed != last_seed:
|
||||
img_num = 1
|
||||
log_message += f' # (batch image {img_num} of {batch_size})'
|
||||
else:
|
||||
img_num += 1
|
||||
log_message += f' # (batch image {img_num} of {batch_size})'
|
||||
last_seed = seed
|
||||
print(log_message)
|
||||
logfile.write(log_message+"\n")
|
||||
logfile.flush()
|
||||
if r[0] not in seenit:
|
||||
seenit[r[0]] = True
|
||||
try:
|
||||
if opt.grid:
|
||||
_write_prompt_to_png(r[0],f'{prompt_str} -g -S{seed} {seeds}')
|
||||
else:
|
||||
_write_prompt_to_png(r[0],f'{prompt_str} -S{seed}')
|
||||
except FileNotFoundError:
|
||||
print(f"Could not open file '{r[0]}' for reading")
|
||||
|
||||
def _reconstruct_switches(t2i,opt):
|
||||
'''Normalize the prompt and switches'''
|
||||
switches = list()
|
||||
switches.append(f'"{opt.prompt}"')
|
||||
switches.append(f'-s{opt.steps or t2i.steps}')
|
||||
switches.append(f'-b{opt.batch_size or t2i.batch_size}')
|
||||
switches.append(f'-W{opt.width or t2i.width}')
|
||||
switches.append(f'-H{opt.height or t2i.height}')
|
||||
switches.append(f'-C{opt.cfg_scale or t2i.cfg_scale}')
|
||||
switches.append(f'-m{t2i.sampler_name}')
|
||||
if opt.variants:
|
||||
switches.append(f'-v{opt.variants}')
|
||||
if opt.init_img:
|
||||
switches.append(f'-I{opt.init_img}')
|
||||
if opt.strength and opt.init_img is not None:
|
||||
switches.append(f'-f{opt.strength or t2i.strength}')
|
||||
if t2i.full_precision:
|
||||
switches.append('-F')
|
||||
return switches
|
||||
|
||||
def _write_prompt_to_png(path,prompt):
|
||||
info = PngImagePlugin.PngInfo()
|
||||
info.add_text("Dream",prompt)
|
||||
im = Image.open(path)
|
||||
im.save(path,"PNG",pnginfo=info)
|
||||
|
||||
def create_argv_parser():
|
||||
parser = argparse.ArgumentParser(description="Parse script's command line args")
|
||||
parser.add_argument("--laion400m",
|
||||
@ -297,10 +243,6 @@ def create_argv_parser():
|
||||
dest='full_precision',
|
||||
action='store_true',
|
||||
help="use slower full precision math for calculations")
|
||||
parser.add_argument('-b','--batch_size',
|
||||
type=int,
|
||||
default=1,
|
||||
help="number of images to produce per iteration (faster, but doesn't generate individual seeds")
|
||||
parser.add_argument('--sampler','-m',
|
||||
dest="sampler_name",
|
||||
choices=['ddim', 'k_dpm_2_a', 'k_dpm_2', 'k_euler_a', 'k_euler', 'k_heun', 'k_lms', 'plms'],
|
||||
@ -336,93 +278,12 @@ def create_cmd_parser():
|
||||
parser.add_argument('-i','--individual',action='store_true',help="generate individual files (default)")
|
||||
parser.add_argument('-I','--init_img',type=str,help="path to input image for img2img mode (supersedes width and height)")
|
||||
parser.add_argument('-f','--strength',default=0.75,type=float,help="strength for noising/unnoising. 0.0 preserves image exactly, 1.0 replaces it completely")
|
||||
parser.add_argument('-v','--variants',type=int,help="in img2img mode, the first generated image will get passed back to img2img to generate the requested number of variants")
|
||||
# variants is going to be superseded by a generalized "prompt-morph" function
|
||||
# parser.add_argument('-v','--variants',type=int,help="in img2img mode, the first generated image will get passed back to img2img to generate the requested number of variants")
|
||||
parser.add_argument('-x','--skip_normalize',action='store_true',help="skip subprompt weight normalization")
|
||||
return parser
|
||||
|
||||
if readline_available:
|
||||
def setup_readline():
|
||||
readline.set_completer(Completer(['cd','pwd',
|
||||
'--steps','-s','--seed','-S','--iterations','-n','--batch_size','-b',
|
||||
'--width','-W','--height','-H','--cfg_scale','-C','--grid','-g',
|
||||
'--individual','-i','--init_img','-I','--strength','-f','-v','--variants']).complete)
|
||||
readline.set_completer_delims(" ")
|
||||
readline.parse_and_bind('tab: complete')
|
||||
load_history()
|
||||
|
||||
def load_history():
|
||||
histfile = os.path.join(os.path.expanduser('~'),".dream_history")
|
||||
try:
|
||||
readline.read_history_file(histfile)
|
||||
readline.set_history_length(1000)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
atexit.register(readline.write_history_file,histfile)
|
||||
|
||||
class Completer():
|
||||
def __init__(self,options):
|
||||
self.options = sorted(options)
|
||||
return
|
||||
|
||||
def complete(self,text,state):
|
||||
buffer = readline.get_line_buffer()
|
||||
|
||||
if text.startswith(('-I','--init_img')):
|
||||
return self._path_completions(text,state,('.png'))
|
||||
|
||||
if buffer.strip().endswith('cd') or text.startswith(('.','/')):
|
||||
return self._path_completions(text,state,())
|
||||
|
||||
response = None
|
||||
if state == 0:
|
||||
# This is the first time for this text, so build a match list.
|
||||
if text:
|
||||
self.matches = [s
|
||||
for s in self.options
|
||||
if s and s.startswith(text)]
|
||||
else:
|
||||
self.matches = self.options[:]
|
||||
|
||||
# Return the state'th item from the match list,
|
||||
# if we have that many.
|
||||
try:
|
||||
response = self.matches[state]
|
||||
except IndexError:
|
||||
response = None
|
||||
return response
|
||||
|
||||
def _path_completions(self,text,state,extensions):
|
||||
# get the path so far
|
||||
if text.startswith('-I'):
|
||||
path = text.replace('-I','',1).lstrip()
|
||||
elif text.startswith('--init_img='):
|
||||
path = text.replace('--init_img=','',1).lstrip()
|
||||
else:
|
||||
path = text
|
||||
|
||||
matches = list()
|
||||
|
||||
path = os.path.expanduser(path)
|
||||
if len(path)==0:
|
||||
matches.append(text+'./')
|
||||
else:
|
||||
dir = os.path.dirname(path)
|
||||
dir_list = os.listdir(dir)
|
||||
for n in dir_list:
|
||||
if n.startswith('.') and len(n)>1:
|
||||
continue
|
||||
full_path = os.path.join(dir,n)
|
||||
if full_path.startswith(path):
|
||||
if os.path.isdir(full_path):
|
||||
matches.append(os.path.join(os.path.dirname(text),n)+'/')
|
||||
elif n.endswith(extensions):
|
||||
matches.append(os.path.join(os.path.dirname(text),n))
|
||||
|
||||
try:
|
||||
response = matches[state]
|
||||
except IndexError:
|
||||
response = None
|
||||
return response
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
Reference in New Issue
Block a user