feat(nodes): update all invocations to use new invocation context

Update all invocations to use the new context. The changes are all fairly simple, but there are a lot of them.

Supporting minor changes:
- Patch bump for all nodes that use the context
- Update invocation processor to provide new context
- Minor change to `EventServiceBase` to accept a node's ID instead of the dict version of a node
- Minor change to `ModelManagerService` to support the new wrapped context
- Fanagling of imports to avoid circular dependencies
This commit is contained in:
psychedelicious
2024-01-13 23:23:16 +11:00
parent 9bc2d09889
commit 8637c40661
32 changed files with 716 additions and 1191 deletions

View File

@ -1,7 +1,7 @@
import math
import re
from pathlib import Path
from typing import Optional, TypedDict
from typing import TYPE_CHECKING, Optional, TypedDict
import cv2
import numpy as np
@ -13,13 +13,16 @@ from pydantic import field_validator
import invokeai.assets.fonts as font_assets
from invokeai.app.invocations.baseinvocation import (
BaseInvocation,
InvocationContext,
WithMetadata,
invocation,
invocation_output,
)
from invokeai.app.invocations.fields import InputField, OutputField, WithMetadata
from invokeai.app.invocations.primitives import ImageField, ImageOutput
from invokeai.app.services.image_records.image_records_common import ImageCategory, ResourceOrigin
from invokeai.app.invocations.fields import ImageField, InputField, OutputField
from invokeai.app.invocations.primitives import ImageOutput
from invokeai.app.services.image_records.image_records_common import ImageCategory
if TYPE_CHECKING:
from invokeai.app.services.shared.invocation_context import InvocationContext
@invocation_output("face_mask_output")
@ -174,7 +177,7 @@ def prepare_faces_list(
def generate_face_box_mask(
context: InvocationContext,
context: "InvocationContext",
minimum_confidence: float,
x_offset: float,
y_offset: float,
@ -273,7 +276,7 @@ def generate_face_box_mask(
def extract_face(
context: InvocationContext,
context: "InvocationContext",
image: ImageType,
face: FaceResultData,
padding: int,
@ -304,37 +307,37 @@ def extract_face(
# Adjust the crop boundaries to stay within the original image's dimensions
if x_min < 0:
context.services.logger.warning("FaceTools --> -X-axis padding reached image edge.")
context.logger.warning("FaceTools --> -X-axis padding reached image edge.")
x_max -= x_min
x_min = 0
elif x_max > mask.width:
context.services.logger.warning("FaceTools --> +X-axis padding reached image edge.")
context.logger.warning("FaceTools --> +X-axis padding reached image edge.")
x_min -= x_max - mask.width
x_max = mask.width
if y_min < 0:
context.services.logger.warning("FaceTools --> +Y-axis padding reached image edge.")
context.logger.warning("FaceTools --> +Y-axis padding reached image edge.")
y_max -= y_min
y_min = 0
elif y_max > mask.height:
context.services.logger.warning("FaceTools --> -Y-axis padding reached image edge.")
context.logger.warning("FaceTools --> -Y-axis padding reached image edge.")
y_min -= y_max - mask.height
y_max = mask.height
# Ensure the crop is square and adjust the boundaries if needed
if x_max - x_min != crop_size:
context.services.logger.warning("FaceTools --> Limiting x-axis padding to constrain bounding box to a square.")
context.logger.warning("FaceTools --> Limiting x-axis padding to constrain bounding box to a square.")
diff = crop_size - (x_max - x_min)
x_min -= diff // 2
x_max += diff - diff // 2
if y_max - y_min != crop_size:
context.services.logger.warning("FaceTools --> Limiting y-axis padding to constrain bounding box to a square.")
context.logger.warning("FaceTools --> Limiting y-axis padding to constrain bounding box to a square.")
diff = crop_size - (y_max - y_min)
y_min -= diff // 2
y_max += diff - diff // 2
context.services.logger.info(f"FaceTools --> Calculated bounding box (8 multiple): {crop_size}")
context.logger.info(f"FaceTools --> Calculated bounding box (8 multiple): {crop_size}")
# Crop the output image to the specified size with the center of the face mesh as the center.
mask = mask.crop((x_min, y_min, x_max, y_max))
@ -354,7 +357,7 @@ def extract_face(
def get_faces_list(
context: InvocationContext,
context: "InvocationContext",
image: ImageType,
should_chunk: bool,
minimum_confidence: float,
@ -366,7 +369,7 @@ def get_faces_list(
# Generate the face box mask and get the center of the face.
if not should_chunk:
context.services.logger.info("FaceTools --> Attempting full image face detection.")
context.logger.info("FaceTools --> Attempting full image face detection.")
result = generate_face_box_mask(
context=context,
minimum_confidence=minimum_confidence,
@ -378,7 +381,7 @@ def get_faces_list(
draw_mesh=draw_mesh,
)
if should_chunk or len(result) == 0:
context.services.logger.info("FaceTools --> Chunking image (chunk toggled on, or no face found in full image).")
context.logger.info("FaceTools --> Chunking image (chunk toggled on, or no face found in full image).")
width, height = image.size
image_chunks = []
x_offsets = []
@ -397,7 +400,7 @@ def get_faces_list(
x_offsets.append(x)
y_offsets.append(0)
fx += increment
context.services.logger.info(f"FaceTools --> Chunk starting at x = {x}")
context.logger.info(f"FaceTools --> Chunk starting at x = {x}")
elif height > width:
# Portrait - slice the image vertically
fy = 0.0
@ -409,10 +412,10 @@ def get_faces_list(
x_offsets.append(0)
y_offsets.append(y)
fy += increment
context.services.logger.info(f"FaceTools --> Chunk starting at y = {y}")
context.logger.info(f"FaceTools --> Chunk starting at y = {y}")
for idx in range(len(image_chunks)):
context.services.logger.info(f"FaceTools --> Evaluating faces in chunk {idx}")
context.logger.info(f"FaceTools --> Evaluating faces in chunk {idx}")
result = result + generate_face_box_mask(
context=context,
minimum_confidence=minimum_confidence,
@ -426,7 +429,7 @@ def get_faces_list(
if len(result) == 0:
# Give up
context.services.logger.warning(
context.logger.warning(
"FaceTools --> No face detected in chunked input image. Passing through original image."
)
@ -435,7 +438,7 @@ def get_faces_list(
return all_faces
@invocation("face_off", title="FaceOff", tags=["image", "faceoff", "face", "mask"], category="image", version="1.2.0")
@invocation("face_off", title="FaceOff", tags=["image", "faceoff", "face", "mask"], category="image", version="1.2.1")
class FaceOffInvocation(BaseInvocation, WithMetadata):
"""Bound, extract, and mask a face from an image using MediaPipe detection"""
@ -456,7 +459,7 @@ class FaceOffInvocation(BaseInvocation, WithMetadata):
description="Whether to bypass full image face detection and default to image chunking. Chunking will occur if no faces are found in the full image.",
)
def faceoff(self, context: InvocationContext, image: ImageType) -> Optional[ExtractFaceData]:
def faceoff(self, context: "InvocationContext", image: ImageType) -> Optional[ExtractFaceData]:
all_faces = get_faces_list(
context=context,
image=image,
@ -468,11 +471,11 @@ class FaceOffInvocation(BaseInvocation, WithMetadata):
)
if len(all_faces) == 0:
context.services.logger.warning("FaceOff --> No faces detected. Passing through original image.")
context.logger.warning("FaceOff --> No faces detected. Passing through original image.")
return None
if self.face_id > len(all_faces) - 1:
context.services.logger.warning(
context.logger.warning(
f"FaceOff --> Face ID {self.face_id} is outside of the number of faces detected ({len(all_faces)}). Passing through original image."
)
return None
@ -483,8 +486,8 @@ class FaceOffInvocation(BaseInvocation, WithMetadata):
return face_data
def invoke(self, context: InvocationContext) -> FaceOffOutput:
image = context.services.images.get_pil_image(self.image.image_name)
def invoke(self, context) -> FaceOffOutput:
image = context.images.get_pil(self.image.image_name)
result = self.faceoff(context=context, image=image)
if result is None:
@ -498,24 +501,9 @@ class FaceOffInvocation(BaseInvocation, WithMetadata):
x = result["x_min"]
y = result["y_min"]
image_dto = context.services.images.create(
image=result_image,
image_origin=ResourceOrigin.INTERNAL,
image_category=ImageCategory.GENERAL,
node_id=self.id,
session_id=context.graph_execution_state_id,
is_intermediate=self.is_intermediate,
workflow=context.workflow,
)
image_dto = context.images.save(image=result_image)
mask_dto = context.services.images.create(
image=result_mask,
image_origin=ResourceOrigin.INTERNAL,
image_category=ImageCategory.MASK,
node_id=self.id,
session_id=context.graph_execution_state_id,
is_intermediate=self.is_intermediate,
)
mask_dto = context.images.save(image=result_mask, image_category=ImageCategory.MASK)
output = FaceOffOutput(
image=ImageField(image_name=image_dto.image_name),
@ -529,7 +517,7 @@ class FaceOffInvocation(BaseInvocation, WithMetadata):
return output
@invocation("face_mask_detection", title="FaceMask", tags=["image", "face", "mask"], category="image", version="1.2.0")
@invocation("face_mask_detection", title="FaceMask", tags=["image", "face", "mask"], category="image", version="1.2.1")
class FaceMaskInvocation(BaseInvocation, WithMetadata):
"""Face mask creation using mediapipe face detection"""
@ -556,7 +544,7 @@ class FaceMaskInvocation(BaseInvocation, WithMetadata):
raise ValueError('Face IDs must be a comma-separated list of integers (e.g. "1,2,3")')
return v
def facemask(self, context: InvocationContext, image: ImageType) -> FaceMaskResult:
def facemask(self, context: "InvocationContext", image: ImageType) -> FaceMaskResult:
all_faces = get_faces_list(
context=context,
image=image,
@ -578,7 +566,7 @@ class FaceMaskInvocation(BaseInvocation, WithMetadata):
if len(intersected_face_ids) == 0:
id_range_str = ",".join([str(id) for id in id_range])
context.services.logger.warning(
context.logger.warning(
f"Face IDs must be in range of detected faces - requested {self.face_ids}, detected {id_range_str}. Passing through original image."
)
return FaceMaskResult(
@ -613,28 +601,13 @@ class FaceMaskInvocation(BaseInvocation, WithMetadata):
mask=mask_pil,
)
def invoke(self, context: InvocationContext) -> FaceMaskOutput:
image = context.services.images.get_pil_image(self.image.image_name)
def invoke(self, context) -> FaceMaskOutput:
image = context.images.get_pil(self.image.image_name)
result = self.facemask(context=context, image=image)
image_dto = context.services.images.create(
image=result["image"],
image_origin=ResourceOrigin.INTERNAL,
image_category=ImageCategory.GENERAL,
node_id=self.id,
session_id=context.graph_execution_state_id,
is_intermediate=self.is_intermediate,
workflow=context.workflow,
)
image_dto = context.images.save(image=result["image"])
mask_dto = context.services.images.create(
image=result["mask"],
image_origin=ResourceOrigin.INTERNAL,
image_category=ImageCategory.MASK,
node_id=self.id,
session_id=context.graph_execution_state_id,
is_intermediate=self.is_intermediate,
)
mask_dto = context.images.save(image=result["mask"], image_category=ImageCategory.MASK)
output = FaceMaskOutput(
image=ImageField(image_name=image_dto.image_name),
@ -647,7 +620,7 @@ class FaceMaskInvocation(BaseInvocation, WithMetadata):
@invocation(
"face_identifier", title="FaceIdentifier", tags=["image", "face", "identifier"], category="image", version="1.2.0"
"face_identifier", title="FaceIdentifier", tags=["image", "face", "identifier"], category="image", version="1.2.1"
)
class FaceIdentifierInvocation(BaseInvocation, WithMetadata):
"""Outputs an image with detected face IDs printed on each face. For use with other FaceTools."""
@ -661,7 +634,7 @@ class FaceIdentifierInvocation(BaseInvocation, WithMetadata):
description="Whether to bypass full image face detection and default to image chunking. Chunking will occur if no faces are found in the full image.",
)
def faceidentifier(self, context: InvocationContext, image: ImageType) -> ImageType:
def faceidentifier(self, context: "InvocationContext", image: ImageType) -> ImageType:
image = image.copy()
all_faces = get_faces_list(
@ -702,22 +675,10 @@ class FaceIdentifierInvocation(BaseInvocation, WithMetadata):
return image
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get_pil_image(self.image.image_name)
def invoke(self, context) -> ImageOutput:
image = context.images.get_pil(self.image.image_name)
result_image = self.faceidentifier(context=context, image=image)
image_dto = context.services.images.create(
image=result_image,
image_origin=ResourceOrigin.INTERNAL,
image_category=ImageCategory.GENERAL,
node_id=self.id,
session_id=context.graph_execution_state_id,
is_intermediate=self.is_intermediate,
workflow=context.workflow,
)
image_dto = context.images.save(image=result_image)
return ImageOutput(
image=ImageField(image_name=image_dto.image_name),
width=image_dto.width,
height=image_dto.height,
)
return ImageOutput.build(image_dto)