import torch from transformers import CLIPTextModel, CLIPTokenizer, T5EncoderModel, T5Tokenizer from invokeai.app.invocations.baseinvocation import BaseInvocation, invocation from invokeai.app.invocations.fields import FieldDescriptions, Input, InputField from invokeai.app.invocations.model import CLIPField, T5EncoderField from invokeai.app.invocations.primitives import ConditioningOutput from invokeai.app.services.shared.invocation_context import InvocationContext from invokeai.backend.flux.modules.conditioner import HFEncoder from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningFieldData, FLUXConditioningInfo @invocation( "flux_text_encoder", title="FLUX Text Encoding", tags=["image"], category="image", version="1.0.0", ) class FluxTextEncoderInvocation(BaseInvocation): clip: CLIPField = InputField( title="CLIP", description=FieldDescriptions.clip, input=Input.Connection, ) t5Encoder: T5EncoderField = InputField( title="T5Encoder", description=FieldDescriptions.t5Encoder, input=Input.Connection, ) positive_prompt: str = InputField(description="Positive prompt for text-to-image generation.") # TODO(ryand): Should we create a new return type for this invocation? This ConditioningOutput is clearly not # compatible with other ConditioningOutputs. @torch.no_grad() def invoke(self, context: InvocationContext) -> ConditioningOutput: t5_embeddings, clip_embeddings = self._encode_prompt(context) conditioning_data = ConditioningFieldData( conditionings=[FLUXConditioningInfo(clip_embeds=clip_embeddings, t5_embeds=t5_embeddings)] ) conditioning_name = context.conditioning.save(conditioning_data) return ConditioningOutput.build(conditioning_name) def _encode_prompt(self, context: InvocationContext) -> tuple[torch.Tensor, torch.Tensor]: # TODO: Determine the T5 max sequence length based on the model. # if self.model == "flux-schnell": max_seq_len = 256 # # elif self.model == "flux-dev": # # max_seq_len = 512 # else: # raise ValueError(f"Unknown model: {self.model}") # Load CLIP. clip_tokenizer_info = context.models.load(self.clip.tokenizer) clip_text_encoder_info = context.models.load(self.clip.text_encoder) # Load T5. t5_tokenizer_info = context.models.load(self.t5Encoder.tokenizer) t5_text_encoder_info = context.models.load(self.t5Encoder.text_encoder) with ( clip_text_encoder_info as clip_text_encoder, t5_text_encoder_info as t5_text_encoder, clip_tokenizer_info as clip_tokenizer, t5_tokenizer_info as t5_tokenizer, ): assert isinstance(clip_text_encoder, CLIPTextModel) assert isinstance(t5_text_encoder, T5EncoderModel) assert isinstance(clip_tokenizer, CLIPTokenizer) assert isinstance(t5_tokenizer, T5Tokenizer) clip_encoder = HFEncoder(clip_text_encoder, clip_tokenizer, True, 77) t5_encoder = HFEncoder(t5_text_encoder, t5_tokenizer, False, max_seq_len) prompt = [self.positive_prompt] prompt_embeds = t5_encoder(prompt) pooled_prompt_embeds = clip_encoder(prompt) assert isinstance(prompt_embeds, torch.Tensor) assert isinstance(pooled_prompt_embeds, torch.Tensor) return prompt_embeds, pooled_prompt_embeds