InvokeAI/invokeai/app/invocations/model.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

440 lines
15 KiB
Python
Raw Normal View History

2023-05-29 23:12:33 +00:00
import copy
2023-08-17 22:45:25 +00:00
from typing import List, Literal, Optional
from pydantic import BaseModel, Field
2023-06-11 03:12:21 +00:00
from ...backend.model_management import BaseModelType, ModelType, SubModelType
from .baseinvocation import (
BaseInvocation,
BaseInvocationOutput,
FieldDescriptions,
InputField,
Input,
InvocationContext,
OutputField,
UIType,
tags,
title,
)
class ModelInfo(BaseModel):
2023-05-29 22:11:00 +00:00
model_name: str = Field(description="Info to load submodel")
2023-06-11 03:12:21 +00:00
base_model: BaseModelType = Field(description="Base model")
model_type: ModelType = Field(description="Info to load submodel")
2023-07-28 13:46:44 +00:00
submodel: Optional[SubModelType] = Field(default=None, description="Info to load submodel")
2023-05-29 22:11:00 +00:00
class LoraInfo(ModelInfo):
weight: float = Field(description="Lora's weight which to use when apply to model")
class UNetField(BaseModel):
unet: ModelInfo = Field(description="Info to load unet submodel")
scheduler: ModelInfo = Field(description="Info to load scheduler submodel")
2023-05-29 22:11:00 +00:00
loras: List[LoraInfo] = Field(description="Loras to apply on model loading")
2023-08-27 18:13:00 +00:00
seamless_axes: List[str] = Field(default_factory=list, description="Axes(\"x\" and \"y\") to which apply seamless")
class ClipField(BaseModel):
tokenizer: ModelInfo = Field(description="Info to load tokenizer submodel")
text_encoder: ModelInfo = Field(description="Info to load text_encoder submodel")
2023-07-06 14:39:49 +00:00
skipped_layers: int = Field(description="Number of skipped layers in text_encoder")
2023-05-29 22:11:00 +00:00
loras: List[LoraInfo] = Field(description="Loras to apply on model loading")
2023-07-28 13:46:44 +00:00
class VaeField(BaseModel):
# TODO: better naming?
vae: ModelInfo = Field(description="Info to load vae submodel")
2023-08-27 18:53:57 +00:00
seamless_axes: List[str] = Field(default_factory=list, description="Axes(\"x\" and \"y\") to which apply seamless")
class ModelLoaderOutput(BaseInvocationOutput):
"""Model loader output"""
type: Literal["model_loader_output"] = "model_loader_output"
unet: UNetField = OutputField(description=FieldDescriptions.unet, title="UNet")
clip: ClipField = OutputField(description=FieldDescriptions.clip, title="CLIP")
vae: VaeField = OutputField(description=FieldDescriptions.vae, title="VAE")
2023-07-28 13:46:44 +00:00
class MainModelField(BaseModel):
"""Main model field"""
model_name: str = Field(description="Name of the model")
base_model: BaseModelType = Field(description="Base model")
model_type: ModelType = Field(description="Model Type")
class LoRAModelField(BaseModel):
"""LoRA model field"""
model_name: str = Field(description="Name of the LoRA model")
base_model: BaseModelType = Field(description="Base model")
2023-07-28 13:46:44 +00:00
@title("Main Model")
@tags("model")
class MainModelLoaderInvocation(BaseInvocation):
"""Loads a main model, outputting its submodels."""
type: Literal["main_model_loader"] = "main_model_loader"
# Inputs
model: MainModelField = InputField(description=FieldDescriptions.main_model, input=Input.Direct)
# TODO: precision?
def invoke(self, context: InvocationContext) -> ModelLoaderOutput:
base_model = self.model.base_model
model_name = self.model.model_name
model_type = ModelType.Main
2023-06-11 03:12:21 +00:00
# TODO: not found exceptions
if not context.services.model_manager.model_exists(
model_name=model_name,
2023-06-11 03:12:21 +00:00
base_model=base_model,
model_type=model_type,
):
raise Exception(f"Unknown {base_model} {model_type} model: {model_name}")
"""
if not context.services.model_manager.model_exists(
model_name=self.model_name,
model_type=SDModelType.Diffusers,
submodel=SDModelType.Tokenizer,
):
raise Exception(
f"Failed to find tokenizer submodel in {self.model_name}! Check if model corrupted"
)
if not context.services.model_manager.model_exists(
model_name=self.model_name,
model_type=SDModelType.Diffusers,
submodel=SDModelType.TextEncoder,
):
raise Exception(
f"Failed to find text_encoder submodel in {self.model_name}! Check if model corrupted"
)
if not context.services.model_manager.model_exists(
model_name=self.model_name,
model_type=SDModelType.Diffusers,
submodel=SDModelType.UNet,
):
raise Exception(
f"Failed to find unet submodel from {self.model_name}! Check if model corrupted"
)
"""
return ModelLoaderOutput(
unet=UNetField(
unet=ModelInfo(
model_name=model_name,
2023-06-11 03:12:21 +00:00
base_model=base_model,
model_type=model_type,
2023-06-11 03:12:21 +00:00
submodel=SubModelType.UNet,
),
scheduler=ModelInfo(
model_name=model_name,
2023-06-11 03:12:21 +00:00
base_model=base_model,
model_type=model_type,
2023-06-12 13:14:09 +00:00
submodel=SubModelType.Scheduler,
),
2023-05-29 23:12:33 +00:00
loras=[],
),
clip=ClipField(
tokenizer=ModelInfo(
model_name=model_name,
2023-06-11 03:12:21 +00:00
base_model=base_model,
model_type=model_type,
2023-06-12 13:14:09 +00:00
submodel=SubModelType.Tokenizer,
),
text_encoder=ModelInfo(
model_name=model_name,
2023-06-11 03:12:21 +00:00
base_model=base_model,
model_type=model_type,
2023-06-12 13:14:09 +00:00
submodel=SubModelType.TextEncoder,
),
2023-05-29 23:12:33 +00:00
loras=[],
2023-07-06 14:39:49 +00:00
skipped_layers=0,
),
vae=VaeField(
vae=ModelInfo(
model_name=model_name,
2023-06-11 03:12:21 +00:00
base_model=base_model,
model_type=model_type,
2023-06-12 13:14:09 +00:00
submodel=SubModelType.Vae,
),
),
)
2023-05-29 23:12:33 +00:00
2023-07-28 13:46:44 +00:00
2023-05-29 23:12:33 +00:00
class LoraLoaderOutput(BaseInvocationOutput):
"""Model loader output"""
# fmt: off
2023-05-29 23:12:33 +00:00
type: Literal["lora_loader_output"] = "lora_loader_output"
unet: Optional[UNetField] = OutputField(default=None, description=FieldDescriptions.unet, title="UNet")
clip: Optional[ClipField] = OutputField(default=None, description=FieldDescriptions.clip, title="CLIP")
# fmt: on
2023-05-29 23:12:33 +00:00
@title("LoRA")
@tags("lora", "model")
2023-05-29 23:12:33 +00:00
class LoraLoaderInvocation(BaseInvocation):
"""Apply selected lora to unet and text_encoder."""
type: Literal["lora_loader"] = "lora_loader"
# Inputs
lora: LoRAModelField = InputField(description=FieldDescriptions.lora_model, input=Input.Direct, title="LoRA")
weight: float = InputField(default=0.75, description=FieldDescriptions.lora_weight)
unet: Optional[UNetField] = InputField(
default=None, description=FieldDescriptions.unet, input=Input.Connection, title="UNet"
)
clip: Optional[ClipField] = InputField(
default=None, description=FieldDescriptions.clip, input=Input.Connection, title="CLIP"
)
2023-05-29 23:12:33 +00:00
def invoke(self, context: InvocationContext) -> LoraLoaderOutput:
if self.lora is None:
raise Exception("No LoRA provided")
2023-05-29 23:12:33 +00:00
base_model = self.lora.base_model
lora_name = self.lora.model_name
2023-05-29 23:12:33 +00:00
if not context.services.model_manager.model_exists(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
2023-05-29 23:12:33 +00:00
):
raise Exception(f"Unkown lora name: {lora_name}!")
2023-05-29 23:12:33 +00:00
2023-07-28 13:46:44 +00:00
if self.unet is not None and any(lora.model_name == lora_name for lora in self.unet.loras):
raise Exception(f'Lora "{lora_name}" already applied to unet')
2023-05-29 23:12:33 +00:00
2023-07-28 13:46:44 +00:00
if self.clip is not None and any(lora.model_name == lora_name for lora in self.clip.loras):
raise Exception(f'Lora "{lora_name}" already applied to clip')
2023-05-29 23:12:33 +00:00
output = LoraLoaderOutput()
if self.unet is not None:
output.unet = copy.deepcopy(self.unet)
output.unet.loras.append(
LoraInfo(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
2023-05-29 23:12:33 +00:00
submodel=None,
weight=self.weight,
)
)
if self.clip is not None:
output.clip = copy.deepcopy(self.clip)
output.clip.loras.append(
LoraInfo(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
2023-05-29 23:12:33 +00:00
submodel=None,
weight=self.weight,
)
)
return output
2023-07-31 20:18:02 +00:00
class SDXLLoraLoaderOutput(BaseInvocationOutput):
"""SDXL LoRA Loader Output"""
2023-07-31 20:18:02 +00:00
# fmt: off
type: Literal["sdxl_lora_loader_output"] = "sdxl_lora_loader_output"
unet: Optional[UNetField] = OutputField(default=None, description=FieldDescriptions.unet, title="UNet")
clip: Optional[ClipField] = OutputField(default=None, description=FieldDescriptions.clip, title="CLIP 1")
clip2: Optional[ClipField] = OutputField(default=None, description=FieldDescriptions.clip, title="CLIP 2")
2023-07-31 20:18:02 +00:00
# fmt: on
@title("SDXL LoRA")
@tags("sdxl", "lora", "model")
2023-07-31 20:18:02 +00:00
class SDXLLoraLoaderInvocation(BaseInvocation):
"""Apply selected lora to unet and text_encoder."""
type: Literal["sdxl_lora_loader"] = "sdxl_lora_loader"
lora: LoRAModelField = InputField(description=FieldDescriptions.lora_model, input=Input.Direct, title="LoRA")
weight: float = Field(default=0.75, description=FieldDescriptions.lora_weight)
unet: Optional[UNetField] = Field(
default=None, description=FieldDescriptions.unet, input=Input.Connection, title="UNET"
)
clip: Optional[ClipField] = Field(
default=None, description=FieldDescriptions.clip, input=Input.Connection, title="CLIP 1"
)
clip2: Optional[ClipField] = Field(
default=None, description=FieldDescriptions.clip, input=Input.Connection, title="CLIP 2"
)
2023-07-31 20:18:02 +00:00
def invoke(self, context: InvocationContext) -> SDXLLoraLoaderOutput:
if self.lora is None:
raise Exception("No LoRA provided")
base_model = self.lora.base_model
lora_name = self.lora.model_name
if not context.services.model_manager.model_exists(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
):
raise Exception(f"Unknown lora name: {lora_name}!")
2023-07-31 20:18:02 +00:00
if self.unet is not None and any(lora.model_name == lora_name for lora in self.unet.loras):
raise Exception(f'Lora "{lora_name}" already applied to unet')
if self.clip is not None and any(lora.model_name == lora_name for lora in self.clip.loras):
raise Exception(f'Lora "{lora_name}" already applied to clip')
if self.clip2 is not None and any(lora.model_name == lora_name for lora in self.clip2.loras):
raise Exception(f'Lora "{lora_name}" already applied to clip2')
output = SDXLLoraLoaderOutput()
if self.unet is not None:
output.unet = copy.deepcopy(self.unet)
output.unet.loras.append(
LoraInfo(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
submodel=None,
weight=self.weight,
)
)
if self.clip is not None:
output.clip = copy.deepcopy(self.clip)
output.clip.loras.append(
LoraInfo(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
submodel=None,
weight=self.weight,
)
)
if self.clip2 is not None:
output.clip2 = copy.deepcopy(self.clip2)
output.clip2.loras.append(
LoraInfo(
base_model=base_model,
model_name=lora_name,
model_type=ModelType.Lora,
submodel=None,
weight=self.weight,
)
)
return output
2023-06-30 22:15:04 +00:00
class VAEModelField(BaseModel):
"""Vae model field"""
model_name: str = Field(description="Name of the model")
base_model: BaseModelType = Field(description="Base model")
2023-06-30 22:15:04 +00:00
class VaeLoaderOutput(BaseInvocationOutput):
"""Model loader output"""
type: Literal["vae_loader_output"] = "vae_loader_output"
# Outputs
vae: VaeField = OutputField(description=FieldDescriptions.vae, title="VAE")
2023-06-30 22:15:04 +00:00
@title("VAE")
@tags("vae", "model")
2023-06-30 22:15:04 +00:00
class VaeLoaderInvocation(BaseInvocation):
"""Loads a VAE model, outputting a VaeLoaderOutput"""
2023-06-30 22:15:04 +00:00
type: Literal["vae_loader"] = "vae_loader"
# Inputs
vae_model: VAEModelField = InputField(
description=FieldDescriptions.vae_model, input=Input.Direct, ui_type=UIType.VaeModel, title="VAE"
)
2023-06-30 22:15:04 +00:00
def invoke(self, context: InvocationContext) -> VaeLoaderOutput:
base_model = self.vae_model.base_model
model_name = self.vae_model.model_name
model_type = ModelType.Vae
2023-06-30 22:15:04 +00:00
if not context.services.model_manager.model_exists(
base_model=base_model,
model_name=model_name,
model_type=model_type,
2023-06-30 22:15:04 +00:00
):
raise Exception(f"Unkown vae name: {model_name}!")
return VaeLoaderOutput(
vae=VaeField(
vae=ModelInfo(
model_name=model_name,
base_model=base_model,
model_type=model_type,
)
2023-06-30 22:15:04 +00:00
)
)
2023-08-27 18:13:00 +00:00
class SeamlessModeOutput(BaseInvocationOutput):
"""Modified Seamless Model output"""
type: Literal["seamless_output"] = "seamless_output"
# Outputs
unet: UNetField = OutputField(description=FieldDescriptions.unet, title="UNet")
2023-08-27 18:53:57 +00:00
vae: VaeField = OutputField(description=FieldDescriptions.vae, title="VAE")
2023-08-27 18:13:00 +00:00
@title("Seamless")
@tags("seamless", "model")
class SeamlessModeInvocation(BaseInvocation):
"""Apply seamless mode to unet."""
type: Literal["seamless"] = "seamless"
# Inputs
unet: UNetField = InputField(
description=FieldDescriptions.unet, input=Input.Connection, title="UNet"
)
2023-08-27 18:53:57 +00:00
vae_model: VAEModelField = InputField(
description=FieldDescriptions.vae_model, input=Input.Direct, ui_type=UIType.VaeModel, title="VAE"
)
2023-08-27 18:13:00 +00:00
seamless_y: bool = InputField(default=True, input=Input.Any, description="Specify whether Y axis is seamless")
seamless_x: bool = InputField(default=True, input=Input.Any, description="Specify whether X axis is seamless")
def invoke(self, context: InvocationContext) -> SeamlessModeOutput:
# Conditionally append 'x' and 'y' based on seamless_x and seamless_y
unet = copy.deepcopy(self.unet)
2023-08-27 18:53:57 +00:00
vae = copy.deepcopy(self.vae)
2023-08-27 18:13:00 +00:00
seamless_axes_list = []
if self.seamless_x:
seamless_axes_list.append('x')
if self.seamless_y:
seamless_axes_list.append('y')
unet.seamless_axes = seamless_axes_list
2023-08-27 18:53:57 +00:00
vae.seamless_axes = seamless_axes_list
2023-08-27 18:13:00 +00:00
return SeamlessModeOutput(
unet=unet,
2023-08-27 18:53:57 +00:00
vae=vae
2023-08-27 18:13:00 +00:00
)