InvokeAI/tests/nodes/test_nodes.py

148 lines
4.6 KiB
Python
Raw Normal View History

from typing import Any, Callable, Union
2023-09-11 13:57:41 +00:00
from invokeai.app.invocations.baseinvocation import (
BaseInvocation,
BaseInvocationOutput,
2023-10-18 07:27:29 +00:00
InputField,
InvocationContext,
2023-10-18 07:27:29 +00:00
OutputField,
invocation,
invocation_output,
)
2023-03-03 05:02:15 +00:00
from invokeai.app.invocations.image import ImageField
2023-07-27 14:54:01 +00:00
# Define test invocations before importing anything that uses invocations
@invocation_output("test_list_output")
class ListPassThroughInvocationOutput(BaseInvocationOutput):
2023-11-27 02:02:09 +00:00
collection: list[ImageField] = OutputField(default=[])
2023-07-27 14:54:01 +00:00
@invocation("test_list")
class ListPassThroughInvocation(BaseInvocation):
2023-11-27 02:02:09 +00:00
collection: list[ImageField] = InputField(default=[])
def invoke(self, context: InvocationContext) -> ListPassThroughInvocationOutput:
2023-07-27 14:54:01 +00:00
return ListPassThroughInvocationOutput(collection=self.collection)
@invocation_output("test_prompt_output")
class PromptTestInvocationOutput(BaseInvocationOutput):
2023-10-18 07:27:29 +00:00
prompt: str = OutputField(default="")
@invocation("test_prompt")
class PromptTestInvocation(BaseInvocation):
2023-10-18 07:27:29 +00:00
prompt: str = InputField(default="")
def invoke(self, context: InvocationContext) -> PromptTestInvocationOutput:
2023-07-27 14:54:01 +00:00
return PromptTestInvocationOutput(prompt=self.prompt)
@invocation("test_error")
class ErrorInvocation(BaseInvocation):
def invoke(self, context: InvocationContext) -> PromptTestInvocationOutput:
raise Exception("This invocation is supposed to fail")
2023-07-27 14:54:01 +00:00
@invocation_output("test_image_output")
class ImageTestInvocationOutput(BaseInvocationOutput):
2023-10-18 07:27:29 +00:00
image: ImageField = OutputField()
2023-07-27 14:54:01 +00:00
@invocation("test_text_to_image")
2023-06-29 06:01:17 +00:00
class TextToImageTestInvocation(BaseInvocation):
2023-10-18 07:27:29 +00:00
prompt: str = InputField(default="")
prompt2: str = InputField(default="")
def invoke(self, context: InvocationContext) -> ImageTestInvocationOutput:
return ImageTestInvocationOutput(image=ImageField(image_name=self.id))
2023-07-27 14:54:01 +00:00
@invocation("test_image_to_image")
2023-06-29 06:01:17 +00:00
class ImageToImageTestInvocation(BaseInvocation):
2023-10-18 07:27:29 +00:00
prompt: str = InputField(default="")
image: Union[ImageField, None] = InputField(default=None)
2023-06-29 06:01:17 +00:00
def invoke(self, context: InvocationContext) -> ImageTestInvocationOutput:
return ImageTestInvocationOutput(image=ImageField(image_name=self.id))
2023-07-27 14:54:01 +00:00
@invocation_output("test_prompt_collection_output")
class PromptCollectionTestInvocationOutput(BaseInvocationOutput):
2023-11-27 02:02:09 +00:00
collection: list[str] = OutputField(default=[])
2023-07-27 14:54:01 +00:00
@invocation("test_prompt_collection")
class PromptCollectionTestInvocation(BaseInvocation):
2023-10-18 07:27:29 +00:00
collection: list[str] = InputField()
def invoke(self, context: InvocationContext) -> PromptCollectionTestInvocationOutput:
return PromptCollectionTestInvocationOutput(collection=self.collection.copy())
2023-07-27 14:54:01 +00:00
@invocation_output("test_any_output")
class AnyTypeTestInvocationOutput(BaseInvocationOutput):
2023-10-18 07:27:29 +00:00
value: Any = OutputField()
@invocation("test_any")
class AnyTypeTestInvocation(BaseInvocation):
2023-10-18 07:27:29 +00:00
value: Any = InputField(default=None)
def invoke(self, context: InvocationContext) -> AnyTypeTestInvocationOutput:
return AnyTypeTestInvocationOutput(value=self.value)
@invocation("test_polymorphic")
class PolymorphicStringTestInvocation(BaseInvocation):
2023-10-18 07:27:29 +00:00
value: Union[str, list[str]] = InputField(default="")
def invoke(self, context: InvocationContext) -> PromptCollectionTestInvocationOutput:
if isinstance(self.value, str):
return PromptCollectionTestInvocationOutput(collection=[self.value])
return PromptCollectionTestInvocationOutput(collection=self.value)
2023-08-18 14:57:18 +00:00
# Importing these must happen after test invocations are defined or they won't register
feat: refactor services folder/module structure Refactor services folder/module structure. **Motivation** While working on our services I've repeatedly encountered circular imports and a general lack of clarity regarding where to put things. The structure introduced goes a long way towards resolving those issues, setting us up for a clean structure going forward. **Services** Services are now in their own folder with a few files: - `services/{service_name}/__init__.py`: init as needed, mostly empty now - `services/{service_name}/{service_name}_base.py`: the base class for the service - `services/{service_name}/{service_name}_{impl_type}.py`: the default concrete implementation of the service - typically one of `sqlite`, `default`, or `memory` - `services/{service_name}/{service_name}_common.py`: any common items - models, exceptions, utilities, etc Though it's a bit verbose to have the service name both as the folder name and the prefix for files, I found it is _extremely_ confusing to have all of the base classes just be named `base.py`. So, at the cost of some verbosity when importing things, I've included the service name in the filename. There are some minor logic changes. For example, in `InvocationProcessor`, instead of assigning the model manager service to a variable to be used later in the file, the service is used directly via the `Invoker`. **Shared** Things that are used across disparate services are in `services/shared/`: - `default_graphs.py`: previously in `services/` - `graphs.py`: previously in `services/` - `paginatation`: generic pagination models used in a few services - `sqlite`: the `SqliteDatabase` class, other sqlite-specific things
2023-09-24 08:11:07 +00:00
from invokeai.app.services.events.events_base import EventServiceBase # noqa: E402
from invokeai.app.services.shared.graph import Edge, EdgeConnection # noqa: E402
2023-08-18 00:55:55 +00:00
2023-03-15 06:09:30 +00:00
def create_edge(from_id: str, from_field: str, to_id: str, to_field: str) -> Edge:
return Edge(
2023-07-27 14:54:01 +00:00
source=EdgeConnection(node_id=from_id, field=from_field),
destination=EdgeConnection(node_id=to_id, field=to_field),
)
class TestEvent:
event_name: str
payload: Any
def __init__(self, event_name: str, payload: Any):
self.event_name = event_name
self.payload = payload
2023-07-27 14:54:01 +00:00
class TestEventService(EventServiceBase):
events: list
def __init__(self):
super().__init__()
self.events = []
def dispatch(self, event_name: str, payload: Any) -> None:
pass
2023-07-27 14:54:01 +00:00
def wait_until(condition: Callable[[], bool], timeout: int = 10, interval: float = 0.1) -> None:
import time
2023-07-27 14:54:01 +00:00
start_time = time.time()
while time.time() - start_time < timeout:
if condition():
return
time.sleep(interval)
2023-03-03 05:02:15 +00:00
raise TimeoutError("Condition not met")