InvokeAI/invokeai/app/invocations/flux_text_encoder.py
2024-08-21 09:09:39 -04:00

84 lines
3.5 KiB
Python

import torch
from transformers import CLIPTextModel, CLIPTokenizer, T5EncoderModel, T5Tokenizer
from invokeai.app.invocations.baseinvocation import BaseInvocation, invocation
from invokeai.app.invocations.fields import FieldDescriptions, Input, InputField
from invokeai.app.invocations.model import CLIPField, T5EncoderField
from invokeai.app.invocations.primitives import ConditioningOutput
from invokeai.app.services.shared.invocation_context import InvocationContext
from invokeai.backend.flux.modules.conditioner import HFEncoder
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningFieldData, FLUXConditioningInfo
@invocation(
"flux_text_encoder",
title="FLUX Text Encoding",
tags=["image"],
category="image",
version="1.0.0",
)
class FluxTextEncoderInvocation(BaseInvocation):
clip: CLIPField = InputField(
title="CLIP",
description=FieldDescriptions.clip,
input=Input.Connection,
)
t5Encoder: T5EncoderField = InputField(
title="T5Encoder",
description=FieldDescriptions.t5Encoder,
input=Input.Connection,
)
positive_prompt: str = InputField(description="Positive prompt for text-to-image generation.")
# TODO(ryand): Should we create a new return type for this invocation? This ConditioningOutput is clearly not
# compatible with other ConditioningOutputs.
@torch.no_grad()
def invoke(self, context: InvocationContext) -> ConditioningOutput:
t5_embeddings, clip_embeddings = self._encode_prompt(context)
conditioning_data = ConditioningFieldData(
conditionings=[FLUXConditioningInfo(clip_embeds=clip_embeddings, t5_embeds=t5_embeddings)]
)
conditioning_name = context.conditioning.save(conditioning_data)
return ConditioningOutput.build(conditioning_name)
def _encode_prompt(self, context: InvocationContext) -> tuple[torch.Tensor, torch.Tensor]:
# TODO: Determine the T5 max sequence length based on the model.
# if self.model == "flux-schnell":
max_seq_len = 256
# # elif self.model == "flux-dev":
# # max_seq_len = 512
# else:
# raise ValueError(f"Unknown model: {self.model}")
# Load CLIP.
clip_tokenizer_info = context.models.load(self.clip.tokenizer)
clip_text_encoder_info = context.models.load(self.clip.text_encoder)
# Load T5.
t5_tokenizer_info = context.models.load(self.t5Encoder.tokenizer)
t5_text_encoder_info = context.models.load(self.t5Encoder.text_encoder)
with (
clip_text_encoder_info as clip_text_encoder,
t5_text_encoder_info as t5_text_encoder,
clip_tokenizer_info as clip_tokenizer,
t5_tokenizer_info as t5_tokenizer,
):
assert isinstance(clip_text_encoder, CLIPTextModel)
assert isinstance(t5_text_encoder, T5EncoderModel)
assert isinstance(clip_tokenizer, CLIPTokenizer)
assert isinstance(t5_tokenizer, T5Tokenizer)
clip_encoder = HFEncoder(clip_text_encoder, clip_tokenizer, True, 77)
t5_encoder = HFEncoder(t5_text_encoder, t5_tokenizer, False, max_seq_len)
prompt = [self.positive_prompt]
prompt_embeds = t5_encoder(prompt)
pooled_prompt_embeds = clip_encoder(prompt)
assert isinstance(prompt_embeds, torch.Tensor)
assert isinstance(pooled_prompt_embeds, torch.Tensor)
return prompt_embeds, pooled_prompt_embeds