mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
6487e7d906
orphaned since #3550 removed the LazilyLoadedModelGroup code, probably unused since ModelCache took over responsibility for sequential offload somewhere around #3335.
247 lines
9.4 KiB
Python
247 lines
9.4 KiB
Python
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
|
|
|
|
from contextlib import contextmanager, ContextDecorator
|
|
from functools import partial
|
|
from typing import Literal, Optional, get_args
|
|
|
|
from pydantic import Field
|
|
|
|
from invokeai.app.models.image import ColorField, ImageCategory, ImageField, ResourceOrigin
|
|
from invokeai.app.util.misc import SEED_MAX, get_random_seed
|
|
from invokeai.backend.generator.inpaint import infill_methods
|
|
from .baseinvocation import BaseInvocation, InvocationConfig, InvocationContext
|
|
from .compel import ConditioningField
|
|
from .image import ImageOutput
|
|
from .model import UNetField, VaeField
|
|
from ..util.step_callback import stable_diffusion_step_callback
|
|
from ...backend.generator import Inpaint, InvokeAIGenerator
|
|
from ...backend.model_management.lora import ModelPatcher
|
|
from ...backend.stable_diffusion import PipelineIntermediateState
|
|
from ...backend.stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline
|
|
|
|
SAMPLER_NAME_VALUES = Literal[tuple(InvokeAIGenerator.schedulers())]
|
|
INFILL_METHODS = Literal[tuple(infill_methods())]
|
|
DEFAULT_INFILL_METHOD = "patchmatch" if "patchmatch" in get_args(INFILL_METHODS) else "tile"
|
|
|
|
|
|
from .latent import get_scheduler
|
|
|
|
|
|
class OldModelContext(ContextDecorator):
|
|
model: StableDiffusionGeneratorPipeline
|
|
|
|
def __init__(self, model):
|
|
self.model = model
|
|
|
|
def __enter__(self):
|
|
return self.model
|
|
|
|
def __exit__(self, *exc):
|
|
return False
|
|
|
|
|
|
class OldModelInfo:
|
|
name: str
|
|
hash: str
|
|
context: OldModelContext
|
|
|
|
def __init__(self, name: str, hash: str, model: StableDiffusionGeneratorPipeline):
|
|
self.name = name
|
|
self.hash = hash
|
|
self.context = OldModelContext(
|
|
model=model,
|
|
)
|
|
|
|
|
|
class InpaintInvocation(BaseInvocation):
|
|
"""Generates an image using inpaint."""
|
|
|
|
type: Literal["inpaint"] = "inpaint"
|
|
|
|
positive_conditioning: Optional[ConditioningField] = Field(description="Positive conditioning for generation")
|
|
negative_conditioning: Optional[ConditioningField] = Field(description="Negative conditioning for generation")
|
|
seed: int = Field(
|
|
ge=0, le=SEED_MAX, description="The seed to use (omit for random)", default_factory=get_random_seed
|
|
)
|
|
steps: int = Field(default=30, gt=0, description="The number of steps to use to generate the image")
|
|
width: int = Field(
|
|
default=512,
|
|
multiple_of=8,
|
|
gt=0,
|
|
description="The width of the resulting image",
|
|
)
|
|
height: int = Field(
|
|
default=512,
|
|
multiple_of=8,
|
|
gt=0,
|
|
description="The height of the resulting image",
|
|
)
|
|
cfg_scale: float = Field(
|
|
default=7.5,
|
|
ge=1,
|
|
description="The Classifier-Free Guidance, higher values may result in a result closer to the prompt",
|
|
)
|
|
scheduler: SAMPLER_NAME_VALUES = Field(default="euler", description="The scheduler to use")
|
|
unet: UNetField = Field(default=None, description="UNet model")
|
|
vae: VaeField = Field(default=None, description="Vae model")
|
|
|
|
# Inputs
|
|
image: Optional[ImageField] = Field(description="The input image")
|
|
strength: float = Field(default=0.75, gt=0, le=1, description="The strength of the original image")
|
|
fit: bool = Field(
|
|
default=True,
|
|
description="Whether or not the result should be fit to the aspect ratio of the input image",
|
|
)
|
|
|
|
# Inputs
|
|
mask: Optional[ImageField] = Field(description="The mask")
|
|
seam_size: int = Field(default=96, ge=1, description="The seam inpaint size (px)")
|
|
seam_blur: int = Field(default=16, ge=0, description="The seam inpaint blur radius (px)")
|
|
seam_strength: float = Field(default=0.75, gt=0, le=1, description="The seam inpaint strength")
|
|
seam_steps: int = Field(default=30, ge=1, description="The number of steps to use for seam inpaint")
|
|
tile_size: int = Field(default=32, ge=1, description="The tile infill method size (px)")
|
|
infill_method: INFILL_METHODS = Field(
|
|
default=DEFAULT_INFILL_METHOD,
|
|
description="The method used to infill empty regions (px)",
|
|
)
|
|
inpaint_width: Optional[int] = Field(
|
|
default=None,
|
|
multiple_of=8,
|
|
gt=0,
|
|
description="The width of the inpaint region (px)",
|
|
)
|
|
inpaint_height: Optional[int] = Field(
|
|
default=None,
|
|
multiple_of=8,
|
|
gt=0,
|
|
description="The height of the inpaint region (px)",
|
|
)
|
|
inpaint_fill: Optional[ColorField] = Field(
|
|
default=ColorField(r=127, g=127, b=127, a=255),
|
|
description="The solid infill method color",
|
|
)
|
|
inpaint_replace: float = Field(
|
|
default=0.0,
|
|
ge=0.0,
|
|
le=1.0,
|
|
description="The amount by which to replace masked areas with latent noise",
|
|
)
|
|
|
|
# Schema customisation
|
|
class Config(InvocationConfig):
|
|
schema_extra = {
|
|
"ui": {"tags": ["stable-diffusion", "image"], "title": "Inpaint"},
|
|
}
|
|
|
|
def dispatch_progress(
|
|
self,
|
|
context: InvocationContext,
|
|
source_node_id: str,
|
|
intermediate_state: PipelineIntermediateState,
|
|
) -> None:
|
|
stable_diffusion_step_callback(
|
|
context=context,
|
|
intermediate_state=intermediate_state,
|
|
node=self.dict(),
|
|
source_node_id=source_node_id,
|
|
)
|
|
|
|
def get_conditioning(self, context, unet):
|
|
positive_cond_data = context.services.latents.get(self.positive_conditioning.conditioning_name)
|
|
c = positive_cond_data.conditionings[0].embeds.to(device=unet.device, dtype=unet.dtype)
|
|
extra_conditioning_info = positive_cond_data.conditionings[0].extra_conditioning
|
|
|
|
negative_cond_data = context.services.latents.get(self.negative_conditioning.conditioning_name)
|
|
uc = negative_cond_data.conditionings[0].embeds.to(device=unet.device, dtype=unet.dtype)
|
|
|
|
return (uc, c, extra_conditioning_info)
|
|
|
|
@contextmanager
|
|
def load_model_old_way(self, context, scheduler):
|
|
def _lora_loader():
|
|
for lora in self.unet.loras:
|
|
lora_info = context.services.model_manager.get_model(
|
|
**lora.dict(exclude={"weight"}),
|
|
context=context,
|
|
)
|
|
yield (lora_info.context.model, lora.weight)
|
|
del lora_info
|
|
return
|
|
|
|
unet_info = context.services.model_manager.get_model(
|
|
**self.unet.unet.dict(),
|
|
context=context,
|
|
)
|
|
vae_info = context.services.model_manager.get_model(
|
|
**self.vae.vae.dict(),
|
|
context=context,
|
|
)
|
|
|
|
with vae_info as vae, ModelPatcher.apply_lora_unet(unet_info.context.model, _lora_loader()), unet_info as unet:
|
|
device = context.services.model_manager.mgr.cache.execution_device
|
|
dtype = context.services.model_manager.mgr.cache.precision
|
|
|
|
pipeline = StableDiffusionGeneratorPipeline(
|
|
vae=vae,
|
|
text_encoder=None,
|
|
tokenizer=None,
|
|
unet=unet,
|
|
scheduler=scheduler,
|
|
safety_checker=None,
|
|
feature_extractor=None,
|
|
requires_safety_checker=False,
|
|
)
|
|
|
|
yield OldModelInfo(
|
|
name=self.unet.unet.model_name,
|
|
hash="<NO-HASH>",
|
|
model=pipeline,
|
|
)
|
|
|
|
def invoke(self, context: InvocationContext) -> ImageOutput:
|
|
image = None if self.image is None else context.services.images.get_pil_image(self.image.image_name)
|
|
mask = None if self.mask is None else context.services.images.get_pil_image(self.mask.image_name)
|
|
|
|
# Get the source node id (we are invoking the prepared node)
|
|
graph_execution_state = context.services.graph_execution_manager.get(context.graph_execution_state_id)
|
|
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
|
|
|
|
scheduler = get_scheduler(
|
|
context=context,
|
|
scheduler_info=self.unet.scheduler,
|
|
scheduler_name=self.scheduler,
|
|
)
|
|
|
|
with self.load_model_old_way(context, scheduler) as model:
|
|
conditioning = self.get_conditioning(context, model.context.model.unet)
|
|
|
|
outputs = Inpaint(model).generate(
|
|
conditioning=conditioning,
|
|
scheduler=scheduler,
|
|
init_image=image,
|
|
mask_image=mask,
|
|
step_callback=partial(self.dispatch_progress, context, source_node_id),
|
|
**self.dict(
|
|
exclude={"positive_conditioning", "negative_conditioning", "scheduler", "image", "mask"}
|
|
), # Shorthand for passing all of the parameters above manually
|
|
)
|
|
|
|
# Outputs is an infinite iterator that will return a new InvokeAIGeneratorOutput object
|
|
# each time it is called. We only need the first one.
|
|
generator_output = next(outputs)
|
|
|
|
image_dto = context.services.images.create(
|
|
image=generator_output.image,
|
|
image_origin=ResourceOrigin.INTERNAL,
|
|
image_category=ImageCategory.GENERAL,
|
|
session_id=context.graph_execution_state_id,
|
|
node_id=self.id,
|
|
is_intermediate=self.is_intermediate,
|
|
)
|
|
|
|
return ImageOutput(
|
|
image=ImageField(image_name=image_dto.image_name),
|
|
width=image_dto.width,
|
|
height=image_dto.height,
|
|
)
|