mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
Apply black
This commit is contained in:
@ -42,26 +42,24 @@ def simple_graph():
|
||||
def mock_services() -> InvocationServices:
|
||||
# NOTE: none of these are actually called by the test invocations
|
||||
return InvocationServices(
|
||||
model_manager = None, # type: ignore
|
||||
events = TestEventService(),
|
||||
logger = None, # type: ignore
|
||||
images = None, # type: ignore
|
||||
latents = None, # type: ignore
|
||||
boards = None, # type: ignore
|
||||
board_images = None, # type: ignore
|
||||
queue = MemoryInvocationQueue(),
|
||||
graph_library=SqliteItemStorage[LibraryGraph](
|
||||
filename=sqlite_memory, table_name="graphs"
|
||||
model_manager=None, # type: ignore
|
||||
events=TestEventService(),
|
||||
logger=None, # type: ignore
|
||||
images=None, # type: ignore
|
||||
latents=None, # type: ignore
|
||||
boards=None, # type: ignore
|
||||
board_images=None, # type: ignore
|
||||
queue=MemoryInvocationQueue(),
|
||||
graph_library=SqliteItemStorage[LibraryGraph](filename=sqlite_memory, table_name="graphs"),
|
||||
graph_execution_manager=SqliteItemStorage[GraphExecutionState](
|
||||
filename=sqlite_memory, table_name="graph_executions"
|
||||
),
|
||||
graph_execution_manager = SqliteItemStorage[GraphExecutionState](filename = sqlite_memory, table_name = 'graph_executions'),
|
||||
processor = DefaultInvocationProcessor(),
|
||||
configuration = None, # type: ignore
|
||||
processor=DefaultInvocationProcessor(),
|
||||
configuration=None, # type: ignore
|
||||
)
|
||||
|
||||
|
||||
def invoke_next(
|
||||
g: GraphExecutionState, services: InvocationServices
|
||||
) -> tuple[BaseInvocation, BaseInvocationOutput]:
|
||||
def invoke_next(g: GraphExecutionState, services: InvocationServices) -> tuple[BaseInvocation, BaseInvocationOutput]:
|
||||
n = g.next()
|
||||
if n is None:
|
||||
return (None, None)
|
||||
@ -130,9 +128,7 @@ def test_graph_state_expands_iterator(mock_services):
|
||||
def test_graph_state_collects(mock_services):
|
||||
graph = Graph()
|
||||
test_prompts = ["Banana sushi", "Cat sushi"]
|
||||
graph.add_node(
|
||||
PromptCollectionTestInvocation(id="1", collection=list(test_prompts))
|
||||
)
|
||||
graph.add_node(PromptCollectionTestInvocation(id="1", collection=list(test_prompts)))
|
||||
graph.add_node(IterateInvocation(id="2"))
|
||||
graph.add_node(PromptTestInvocation(id="3"))
|
||||
graph.add_node(CollectInvocation(id="4"))
|
||||
@ -158,16 +154,10 @@ def test_graph_state_prepares_eagerly(mock_services):
|
||||
graph = Graph()
|
||||
|
||||
test_prompts = ["Banana sushi", "Cat sushi"]
|
||||
graph.add_node(
|
||||
PromptCollectionTestInvocation(
|
||||
id="prompt_collection", collection=list(test_prompts)
|
||||
)
|
||||
)
|
||||
graph.add_node(PromptCollectionTestInvocation(id="prompt_collection", collection=list(test_prompts)))
|
||||
graph.add_node(IterateInvocation(id="iterate"))
|
||||
graph.add_node(PromptTestInvocation(id="prompt_iterated"))
|
||||
graph.add_edge(
|
||||
create_edge("prompt_collection", "collection", "iterate", "collection")
|
||||
)
|
||||
graph.add_edge(create_edge("prompt_collection", "collection", "iterate", "collection"))
|
||||
graph.add_edge(create_edge("iterate", "item", "prompt_iterated", "prompt"))
|
||||
|
||||
# separated, fully-preparable chain of nodes
|
||||
@ -193,21 +183,13 @@ def test_graph_executes_depth_first(mock_services):
|
||||
graph = Graph()
|
||||
|
||||
test_prompts = ["Banana sushi", "Cat sushi"]
|
||||
graph.add_node(
|
||||
PromptCollectionTestInvocation(
|
||||
id="prompt_collection", collection=list(test_prompts)
|
||||
)
|
||||
)
|
||||
graph.add_node(PromptCollectionTestInvocation(id="prompt_collection", collection=list(test_prompts)))
|
||||
graph.add_node(IterateInvocation(id="iterate"))
|
||||
graph.add_node(PromptTestInvocation(id="prompt_iterated"))
|
||||
graph.add_node(PromptTestInvocation(id="prompt_successor"))
|
||||
graph.add_edge(
|
||||
create_edge("prompt_collection", "collection", "iterate", "collection")
|
||||
)
|
||||
graph.add_edge(create_edge("prompt_collection", "collection", "iterate", "collection"))
|
||||
graph.add_edge(create_edge("iterate", "item", "prompt_iterated", "prompt"))
|
||||
graph.add_edge(
|
||||
create_edge("prompt_iterated", "prompt", "prompt_successor", "prompt")
|
||||
)
|
||||
graph.add_edge(create_edge("prompt_iterated", "prompt", "prompt_successor", "prompt"))
|
||||
|
||||
g = GraphExecutionState(graph=graph)
|
||||
n1 = invoke_next(g, mock_services)
|
||||
|
@ -35,20 +35,20 @@ def simple_graph():
|
||||
def mock_services() -> InvocationServices:
|
||||
# NOTE: none of these are actually called by the test invocations
|
||||
return InvocationServices(
|
||||
model_manager = None, # type: ignore
|
||||
events = TestEventService(),
|
||||
logger = None, # type: ignore
|
||||
images = None, # type: ignore
|
||||
latents = None, # type: ignore
|
||||
boards = None, # type: ignore
|
||||
board_images = None, # type: ignore
|
||||
queue = MemoryInvocationQueue(),
|
||||
graph_library=SqliteItemStorage[LibraryGraph](
|
||||
filename=sqlite_memory, table_name="graphs"
|
||||
model_manager=None, # type: ignore
|
||||
events=TestEventService(),
|
||||
logger=None, # type: ignore
|
||||
images=None, # type: ignore
|
||||
latents=None, # type: ignore
|
||||
boards=None, # type: ignore
|
||||
board_images=None, # type: ignore
|
||||
queue=MemoryInvocationQueue(),
|
||||
graph_library=SqliteItemStorage[LibraryGraph](filename=sqlite_memory, table_name="graphs"),
|
||||
graph_execution_manager=SqliteItemStorage[GraphExecutionState](
|
||||
filename=sqlite_memory, table_name="graph_executions"
|
||||
),
|
||||
graph_execution_manager = SqliteItemStorage[GraphExecutionState](filename = sqlite_memory, table_name = 'graph_executions'),
|
||||
processor = DefaultInvocationProcessor(),
|
||||
configuration = None, # type: ignore
|
||||
processor=DefaultInvocationProcessor(),
|
||||
configuration=None, # type: ignore
|
||||
)
|
||||
|
||||
|
||||
|
@ -1,5 +1,21 @@
|
||||
from .test_nodes import ImageToImageTestInvocation, TextToImageTestInvocation, ListPassThroughInvocation, PromptTestInvocation
|
||||
from invokeai.app.services.graph import Edge, Graph, GraphInvocation, InvalidEdgeError, NodeAlreadyInGraphError, NodeNotFoundError, are_connections_compatible, EdgeConnection, CollectInvocation, IterateInvocation
|
||||
from .test_nodes import (
|
||||
ImageToImageTestInvocation,
|
||||
TextToImageTestInvocation,
|
||||
ListPassThroughInvocation,
|
||||
PromptTestInvocation,
|
||||
)
|
||||
from invokeai.app.services.graph import (
|
||||
Edge,
|
||||
Graph,
|
||||
GraphInvocation,
|
||||
InvalidEdgeError,
|
||||
NodeAlreadyInGraphError,
|
||||
NodeNotFoundError,
|
||||
are_connections_compatible,
|
||||
EdgeConnection,
|
||||
CollectInvocation,
|
||||
IterateInvocation,
|
||||
)
|
||||
from invokeai.app.invocations.upscale import ESRGANInvocation
|
||||
from invokeai.app.invocations.image import *
|
||||
from invokeai.app.invocations.math import AddInvocation, SubtractInvocation
|
||||
@ -11,35 +27,38 @@ import pytest
|
||||
# Helpers
|
||||
def create_edge(from_id: str, from_field: str, to_id: str, to_field: str) -> Edge:
|
||||
return Edge(
|
||||
source=EdgeConnection(node_id = from_id, field = from_field),
|
||||
destination=EdgeConnection(node_id = to_id, field = to_field)
|
||||
source=EdgeConnection(node_id=from_id, field=from_field),
|
||||
destination=EdgeConnection(node_id=to_id, field=to_field),
|
||||
)
|
||||
|
||||
|
||||
# Tests
|
||||
def test_connections_are_compatible():
|
||||
from_node = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
from_node = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
from_field = "image"
|
||||
to_node = ESRGANInvocation(id = "2")
|
||||
to_node = ESRGANInvocation(id="2")
|
||||
to_field = "image"
|
||||
|
||||
result = are_connections_compatible(from_node, from_field, to_node, to_field)
|
||||
|
||||
assert result == True
|
||||
|
||||
|
||||
def test_connections_are_incompatible():
|
||||
from_node = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
from_node = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
from_field = "image"
|
||||
to_node = ESRGANInvocation(id = "2")
|
||||
to_node = ESRGANInvocation(id="2")
|
||||
to_field = "strength"
|
||||
|
||||
result = are_connections_compatible(from_node, from_field, to_node, to_field)
|
||||
|
||||
assert result == False
|
||||
|
||||
|
||||
def test_connections_incompatible_with_invalid_fields():
|
||||
from_node = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
from_node = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
from_field = "invalid_field"
|
||||
to_node = ESRGANInvocation(id = "2")
|
||||
to_node = ESRGANInvocation(id="2")
|
||||
to_field = "image"
|
||||
|
||||
# From field is invalid
|
||||
@ -53,138 +72,152 @@ def test_connections_incompatible_with_invalid_fields():
|
||||
result = are_connections_compatible(from_node, from_field, to_node, to_field)
|
||||
assert result == False
|
||||
|
||||
|
||||
def test_graph_can_add_node():
|
||||
g = Graph()
|
||||
n = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.add_node(n)
|
||||
|
||||
assert n.id in g.nodes
|
||||
|
||||
|
||||
def test_graph_fails_to_add_node_with_duplicate_id():
|
||||
g = Graph()
|
||||
n = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.add_node(n)
|
||||
n2 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi the second")
|
||||
n2 = TextToImageTestInvocation(id="1", prompt="Banana sushi the second")
|
||||
|
||||
with pytest.raises(NodeAlreadyInGraphError):
|
||||
g.add_node(n2)
|
||||
|
||||
|
||||
def test_graph_updates_node():
|
||||
g = Graph()
|
||||
n = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.add_node(n)
|
||||
n2 = TextToImageTestInvocation(id = "2", prompt = "Banana sushi the second")
|
||||
n2 = TextToImageTestInvocation(id="2", prompt="Banana sushi the second")
|
||||
g.add_node(n2)
|
||||
|
||||
nu = TextToImageTestInvocation(id = "1", prompt = "Banana sushi updated")
|
||||
nu = TextToImageTestInvocation(id="1", prompt="Banana sushi updated")
|
||||
|
||||
g.update_node("1", nu)
|
||||
|
||||
assert g.nodes["1"].prompt == "Banana sushi updated"
|
||||
|
||||
|
||||
def test_graph_fails_to_update_node_if_type_changes():
|
||||
g = Graph()
|
||||
n = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.add_node(n)
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n2)
|
||||
|
||||
nu = ESRGANInvocation(id = "1")
|
||||
nu = ESRGANInvocation(id="1")
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
g.update_node("1", nu)
|
||||
|
||||
|
||||
def test_graph_allows_non_conflicting_id_change():
|
||||
g = Graph()
|
||||
n = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.add_node(n)
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n2)
|
||||
e1 = create_edge(n.id,"image",n2.id,"image")
|
||||
e1 = create_edge(n.id, "image", n2.id, "image")
|
||||
g.add_edge(e1)
|
||||
|
||||
nu = TextToImageTestInvocation(id = "3", prompt = "Banana sushi")
|
||||
|
||||
nu = TextToImageTestInvocation(id="3", prompt="Banana sushi")
|
||||
g.update_node("1", nu)
|
||||
|
||||
with pytest.raises(NodeNotFoundError):
|
||||
g.get_node("1")
|
||||
|
||||
|
||||
assert g.get_node("3").prompt == "Banana sushi"
|
||||
|
||||
assert len(g.edges) == 1
|
||||
assert Edge(source=EdgeConnection(node_id = "3", field = "image"), destination=EdgeConnection(node_id = "2", field = "image")) in g.edges
|
||||
assert (
|
||||
Edge(source=EdgeConnection(node_id="3", field="image"), destination=EdgeConnection(node_id="2", field="image"))
|
||||
in g.edges
|
||||
)
|
||||
|
||||
|
||||
def test_graph_fails_to_update_node_id_if_conflict():
|
||||
g = Graph()
|
||||
n = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.add_node(n)
|
||||
n2 = TextToImageTestInvocation(id = "2", prompt = "Banana sushi the second")
|
||||
n2 = TextToImageTestInvocation(id="2", prompt="Banana sushi the second")
|
||||
g.add_node(n2)
|
||||
|
||||
nu = TextToImageTestInvocation(id = "2", prompt = "Banana sushi")
|
||||
|
||||
nu = TextToImageTestInvocation(id="2", prompt="Banana sushi")
|
||||
with pytest.raises(NodeAlreadyInGraphError):
|
||||
g.update_node("1", nu)
|
||||
|
||||
|
||||
def test_graph_adds_edge():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e = create_edge(n1.id,"image",n2.id,"image")
|
||||
e = create_edge(n1.id, "image", n2.id, "image")
|
||||
|
||||
g.add_edge(e)
|
||||
|
||||
assert e in g.edges
|
||||
|
||||
|
||||
def test_graph_fails_to_add_edge_with_cycle():
|
||||
g = Graph()
|
||||
n1 = ESRGANInvocation(id = "1")
|
||||
n1 = ESRGANInvocation(id="1")
|
||||
g.add_node(n1)
|
||||
e = create_edge(n1.id,"image",n1.id,"image")
|
||||
e = create_edge(n1.id, "image", n1.id, "image")
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e)
|
||||
|
||||
|
||||
def test_graph_fails_to_add_edge_with_long_cycle():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n3 = ESRGANInvocation(id = "3")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
n3 = ESRGANInvocation(id="3")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
e1 = create_edge(n1.id,"image",n2.id,"image")
|
||||
e2 = create_edge(n2.id,"image",n3.id,"image")
|
||||
e3 = create_edge(n3.id,"image",n2.id,"image")
|
||||
e1 = create_edge(n1.id, "image", n2.id, "image")
|
||||
e2 = create_edge(n2.id, "image", n3.id, "image")
|
||||
e3 = create_edge(n3.id, "image", n2.id, "image")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e3)
|
||||
|
||||
|
||||
def test_graph_fails_to_add_edge_with_missing_node_id():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e1 = create_edge("1","image","3","image")
|
||||
e2 = create_edge("3","image","1","image")
|
||||
e1 = create_edge("1", "image", "3", "image")
|
||||
e2 = create_edge("3", "image", "1", "image")
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e1)
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e2)
|
||||
|
||||
|
||||
def test_graph_fails_to_add_edge_when_destination_exists():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n3 = ESRGANInvocation(id = "3")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
n3 = ESRGANInvocation(id="3")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
e1 = create_edge(n1.id,"image",n2.id,"image")
|
||||
e2 = create_edge(n1.id,"image",n3.id,"image")
|
||||
e3 = create_edge(n2.id,"image",n3.id,"image")
|
||||
e1 = create_edge(n1.id, "image", n2.id, "image")
|
||||
e2 = create_edge(n1.id, "image", n3.id, "image")
|
||||
e3 = create_edge(n2.id, "image", n3.id, "image")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
@ -193,208 +226,223 @@ def test_graph_fails_to_add_edge_when_destination_exists():
|
||||
|
||||
def test_graph_fails_to_add_edge_with_mismatched_types():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e1 = create_edge("1","image","2","strength")
|
||||
e1 = create_edge("1", "image", "2", "strength")
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e1)
|
||||
|
||||
|
||||
def test_graph_connects_collector():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = TextToImageTestInvocation(id = "2", prompt = "Banana sushi 2")
|
||||
n3 = CollectInvocation(id = "3")
|
||||
n4 = ListPassThroughInvocation(id = "4")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = TextToImageTestInvocation(id="2", prompt="Banana sushi 2")
|
||||
n3 = CollectInvocation(id="3")
|
||||
n4 = ListPassThroughInvocation(id="4")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
g.add_node(n4)
|
||||
|
||||
e1 = create_edge("1","image","3","item")
|
||||
e2 = create_edge("2","image","3","item")
|
||||
e3 = create_edge("3","collection","4","collection")
|
||||
e1 = create_edge("1", "image", "3", "item")
|
||||
e2 = create_edge("2", "image", "3", "item")
|
||||
e3 = create_edge("3", "collection", "4", "collection")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
g.add_edge(e3)
|
||||
|
||||
|
||||
# TODO: test that derived types mixed with base types are compatible
|
||||
|
||||
|
||||
def test_graph_collector_invalid_with_varying_input_types():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = PromptTestInvocation(id = "2", prompt = "banana sushi 2")
|
||||
n3 = CollectInvocation(id = "3")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = PromptTestInvocation(id="2", prompt="banana sushi 2")
|
||||
n3 = CollectInvocation(id="3")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
|
||||
e1 = create_edge("1","image","3","item")
|
||||
e2 = create_edge("2","prompt","3","item")
|
||||
e1 = create_edge("1", "image", "3", "item")
|
||||
e2 = create_edge("2", "prompt", "3", "item")
|
||||
g.add_edge(e1)
|
||||
|
||||
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e2)
|
||||
|
||||
|
||||
def test_graph_collector_invalid_with_varying_input_output():
|
||||
g = Graph()
|
||||
n1 = PromptTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = PromptTestInvocation(id = "2", prompt = "Banana sushi 2")
|
||||
n3 = CollectInvocation(id = "3")
|
||||
n4 = ListPassThroughInvocation(id = "4")
|
||||
n1 = PromptTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = PromptTestInvocation(id="2", prompt="Banana sushi 2")
|
||||
n3 = CollectInvocation(id="3")
|
||||
n4 = ListPassThroughInvocation(id="4")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
g.add_node(n4)
|
||||
|
||||
e1 = create_edge("1","prompt","3","item")
|
||||
e2 = create_edge("2","prompt","3","item")
|
||||
e3 = create_edge("3","collection","4","collection")
|
||||
e1 = create_edge("1", "prompt", "3", "item")
|
||||
e2 = create_edge("2", "prompt", "3", "item")
|
||||
e3 = create_edge("3", "collection", "4", "collection")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e3)
|
||||
|
||||
|
||||
def test_graph_collector_invalid_with_non_list_output():
|
||||
g = Graph()
|
||||
n1 = PromptTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = PromptTestInvocation(id = "2", prompt = "Banana sushi 2")
|
||||
n3 = CollectInvocation(id = "3")
|
||||
n4 = PromptTestInvocation(id = "4")
|
||||
n1 = PromptTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = PromptTestInvocation(id="2", prompt="Banana sushi 2")
|
||||
n3 = CollectInvocation(id="3")
|
||||
n4 = PromptTestInvocation(id="4")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
g.add_node(n4)
|
||||
|
||||
e1 = create_edge("1","prompt","3","item")
|
||||
e2 = create_edge("2","prompt","3","item")
|
||||
e3 = create_edge("3","collection","4","prompt")
|
||||
e1 = create_edge("1", "prompt", "3", "item")
|
||||
e2 = create_edge("2", "prompt", "3", "item")
|
||||
e3 = create_edge("3", "collection", "4", "prompt")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e3)
|
||||
|
||||
|
||||
def test_graph_connects_iterator():
|
||||
g = Graph()
|
||||
n1 = ListPassThroughInvocation(id = "1")
|
||||
n2 = IterateInvocation(id = "2")
|
||||
n3 = ImageToImageTestInvocation(id = "3", prompt = "Banana sushi")
|
||||
n1 = ListPassThroughInvocation(id="1")
|
||||
n2 = IterateInvocation(id="2")
|
||||
n3 = ImageToImageTestInvocation(id="3", prompt="Banana sushi")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
|
||||
e1 = create_edge("1","collection","2","collection")
|
||||
e2 = create_edge("2","item","3","image")
|
||||
e1 = create_edge("1", "collection", "2", "collection")
|
||||
e2 = create_edge("2", "item", "3", "image")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
|
||||
|
||||
# TODO: TEST INVALID ITERATOR SCENARIOS
|
||||
|
||||
|
||||
def test_graph_iterator_invalid_if_multiple_inputs():
|
||||
g = Graph()
|
||||
n1 = ListPassThroughInvocation(id = "1")
|
||||
n2 = IterateInvocation(id = "2")
|
||||
n3 = ImageToImageTestInvocation(id = "3", prompt = "Banana sushi")
|
||||
n4 = ListPassThroughInvocation(id = "4")
|
||||
n1 = ListPassThroughInvocation(id="1")
|
||||
n2 = IterateInvocation(id="2")
|
||||
n3 = ImageToImageTestInvocation(id="3", prompt="Banana sushi")
|
||||
n4 = ListPassThroughInvocation(id="4")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
g.add_node(n4)
|
||||
|
||||
e1 = create_edge("1","collection","2","collection")
|
||||
e2 = create_edge("2","item","3","image")
|
||||
e3 = create_edge("4","collection","2","collection")
|
||||
e1 = create_edge("1", "collection", "2", "collection")
|
||||
e2 = create_edge("2", "item", "3", "image")
|
||||
e3 = create_edge("4", "collection", "2", "collection")
|
||||
g.add_edge(e1)
|
||||
g.add_edge(e2)
|
||||
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e3)
|
||||
|
||||
|
||||
def test_graph_iterator_invalid_if_input_not_list():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = IterateInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = IterateInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
|
||||
e1 = create_edge("1","collection","2","collection")
|
||||
e1 = create_edge("1", "collection", "2", "collection")
|
||||
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e1)
|
||||
|
||||
|
||||
def test_graph_iterator_invalid_if_output_and_input_types_different():
|
||||
g = Graph()
|
||||
n1 = ListPassThroughInvocation(id = "1")
|
||||
n2 = IterateInvocation(id = "2")
|
||||
n3 = PromptTestInvocation(id = "3", prompt = "Banana sushi")
|
||||
n1 = ListPassThroughInvocation(id="1")
|
||||
n2 = IterateInvocation(id="2")
|
||||
n3 = PromptTestInvocation(id="3", prompt="Banana sushi")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
|
||||
e1 = create_edge("1","collection","2","collection")
|
||||
e2 = create_edge("2","item","3","prompt")
|
||||
e1 = create_edge("1", "collection", "2", "collection")
|
||||
e2 = create_edge("2", "item", "3", "prompt")
|
||||
g.add_edge(e1)
|
||||
|
||||
with pytest.raises(InvalidEdgeError):
|
||||
g.add_edge(e2)
|
||||
|
||||
|
||||
def test_graph_validates():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e1 = create_edge("1","image","2","image")
|
||||
e1 = create_edge("1", "image", "2", "image")
|
||||
g.add_edge(e1)
|
||||
|
||||
assert g.is_valid() == True
|
||||
|
||||
|
||||
def test_graph_invalid_if_edges_reference_missing_nodes():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
g.nodes[n1.id] = n1
|
||||
e1 = create_edge("1","image","2","image")
|
||||
e1 = create_edge("1", "image", "2", "image")
|
||||
g.edges.append(e1)
|
||||
|
||||
assert g.is_valid() == False
|
||||
|
||||
|
||||
def test_graph_invalid_if_subgraph_invalid():
|
||||
g = Graph()
|
||||
n1 = GraphInvocation(id = "1")
|
||||
n1 = GraphInvocation(id="1")
|
||||
n1.graph = Graph()
|
||||
|
||||
n1_1 = TextToImageTestInvocation(id = "2", prompt = "Banana sushi")
|
||||
n1_1 = TextToImageTestInvocation(id="2", prompt="Banana sushi")
|
||||
n1.graph.nodes[n1_1.id] = n1_1
|
||||
e1 = create_edge("1","image","2","image")
|
||||
e1 = create_edge("1", "image", "2", "image")
|
||||
n1.graph.edges.append(e1)
|
||||
|
||||
g.nodes[n1.id] = n1
|
||||
|
||||
assert g.is_valid() == False
|
||||
|
||||
|
||||
def test_graph_invalid_if_has_cycle():
|
||||
g = Graph()
|
||||
n1 = ESRGANInvocation(id = "1")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = ESRGANInvocation(id="1")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.nodes[n1.id] = n1
|
||||
g.nodes[n2.id] = n2
|
||||
e1 = create_edge("1","image","2","image")
|
||||
e2 = create_edge("2","image","1","image")
|
||||
e1 = create_edge("1", "image", "2", "image")
|
||||
e2 = create_edge("2", "image", "1", "image")
|
||||
g.edges.append(e1)
|
||||
g.edges.append(e2)
|
||||
|
||||
assert g.is_valid() == False
|
||||
|
||||
|
||||
def test_graph_invalid_with_invalid_connection():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.nodes[n1.id] = n1
|
||||
g.nodes[n2.id] = n2
|
||||
e1 = create_edge("1","image","2","strength")
|
||||
e1 = create_edge("1", "image", "2", "strength")
|
||||
g.edges.append(e1)
|
||||
|
||||
assert g.is_valid() == False
|
||||
@ -403,47 +451,47 @@ def test_graph_invalid_with_invalid_connection():
|
||||
# TODO: Subgraph operations
|
||||
def test_graph_gets_subgraph_node():
|
||||
g = Graph()
|
||||
n1 = GraphInvocation(id = "1")
|
||||
n1 = GraphInvocation(id="1")
|
||||
n1.graph = Graph()
|
||||
n1.graph.add_node
|
||||
|
||||
n1_1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n1_1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n1.graph.add_node(n1_1)
|
||||
|
||||
g.add_node(n1)
|
||||
|
||||
result = g.get_node('1.1')
|
||||
result = g.get_node("1.1")
|
||||
|
||||
assert result is not None
|
||||
assert result.id == '1'
|
||||
assert result.id == "1"
|
||||
assert result == n1_1
|
||||
|
||||
|
||||
def test_graph_expands_subgraph():
|
||||
g = Graph()
|
||||
n1 = GraphInvocation(id = "1")
|
||||
n1 = GraphInvocation(id="1")
|
||||
n1.graph = Graph()
|
||||
|
||||
n1_1 = AddInvocation(id = "1", a = 1, b = 2)
|
||||
n1_2 = SubtractInvocation(id = "2", b = 3)
|
||||
n1_1 = AddInvocation(id="1", a=1, b=2)
|
||||
n1_2 = SubtractInvocation(id="2", b=3)
|
||||
n1.graph.add_node(n1_1)
|
||||
n1.graph.add_node(n1_2)
|
||||
n1.graph.add_edge(create_edge("1","a","2","a"))
|
||||
n1.graph.add_edge(create_edge("1", "a", "2", "a"))
|
||||
|
||||
g.add_node(n1)
|
||||
|
||||
n2 = AddInvocation(id = "2", b = 5)
|
||||
n2 = AddInvocation(id="2", b=5)
|
||||
g.add_node(n2)
|
||||
g.add_edge(create_edge("1.2","a","2","a"))
|
||||
g.add_edge(create_edge("1.2", "a", "2", "a"))
|
||||
|
||||
dg = g.nx_graph_flat()
|
||||
assert set(dg.nodes) == set(['1.1', '1.2', '2'])
|
||||
assert set(dg.edges) == set([('1.1', '1.2'), ('1.2', '2')])
|
||||
assert set(dg.nodes) == set(["1.1", "1.2", "2"])
|
||||
assert set(dg.edges) == set([("1.1", "1.2"), ("1.2", "2")])
|
||||
|
||||
|
||||
def test_graph_subgraph_t2i():
|
||||
g = Graph()
|
||||
n1 = GraphInvocation(id = "1")
|
||||
n1 = GraphInvocation(id="1")
|
||||
|
||||
# Get text to image default graph
|
||||
lg = create_text_to_image()
|
||||
@ -451,28 +499,26 @@ def test_graph_subgraph_t2i():
|
||||
|
||||
g.add_node(n1)
|
||||
|
||||
n2 = ParamIntInvocation(id = "2", a = 512)
|
||||
n3 = ParamIntInvocation(id = "3", a = 256)
|
||||
n2 = ParamIntInvocation(id="2", a=512)
|
||||
n3 = ParamIntInvocation(id="3", a=256)
|
||||
|
||||
g.add_node(n2)
|
||||
g.add_node(n3)
|
||||
|
||||
g.add_edge(create_edge("2","a","1.width","a"))
|
||||
g.add_edge(create_edge("3","a","1.height","a"))
|
||||
|
||||
n4 = ShowImageInvocation(id = "4")
|
||||
g.add_edge(create_edge("2", "a", "1.width", "a"))
|
||||
g.add_edge(create_edge("3", "a", "1.height", "a"))
|
||||
|
||||
n4 = ShowImageInvocation(id="4")
|
||||
g.add_node(n4)
|
||||
g.add_edge(create_edge("1.8","image","4","image"))
|
||||
g.add_edge(create_edge("1.8", "image", "4", "image"))
|
||||
|
||||
# Validate
|
||||
dg = g.nx_graph_flat()
|
||||
assert set(dg.nodes) == set(['1.width', '1.height', '1.seed', '1.3', '1.4', '1.5', '1.6', '1.7', '1.8', '2', '3', '4'])
|
||||
expected_edges = [(f'1.{e.source.node_id}',f'1.{e.destination.node_id}') for e in lg.graph.edges]
|
||||
expected_edges.extend([
|
||||
('2','1.width'),
|
||||
('3','1.height'),
|
||||
('1.8','4')
|
||||
])
|
||||
assert set(dg.nodes) == set(
|
||||
["1.width", "1.height", "1.seed", "1.3", "1.4", "1.5", "1.6", "1.7", "1.8", "2", "3", "4"]
|
||||
)
|
||||
expected_edges = [(f"1.{e.source.node_id}", f"1.{e.destination.node_id}") for e in lg.graph.edges]
|
||||
expected_edges.extend([("2", "1.width"), ("3", "1.height"), ("1.8", "4")])
|
||||
print(expected_edges)
|
||||
print(list(dg.edges))
|
||||
assert set(dg.edges) == set(expected_edges)
|
||||
@ -480,86 +526,90 @@ def test_graph_subgraph_t2i():
|
||||
|
||||
def test_graph_fails_to_get_missing_subgraph_node():
|
||||
g = Graph()
|
||||
n1 = GraphInvocation(id = "1")
|
||||
n1 = GraphInvocation(id="1")
|
||||
n1.graph = Graph()
|
||||
n1.graph.add_node
|
||||
|
||||
n1_1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n1_1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n1.graph.add_node(n1_1)
|
||||
|
||||
g.add_node(n1)
|
||||
|
||||
with pytest.raises(NodeNotFoundError):
|
||||
result = g.get_node('1.2')
|
||||
result = g.get_node("1.2")
|
||||
|
||||
|
||||
def test_graph_fails_to_enumerate_non_subgraph_node():
|
||||
g = Graph()
|
||||
n1 = GraphInvocation(id = "1")
|
||||
n1 = GraphInvocation(id="1")
|
||||
n1.graph = Graph()
|
||||
n1.graph.add_node
|
||||
|
||||
n1_1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n1_1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n1.graph.add_node(n1_1)
|
||||
|
||||
g.add_node(n1)
|
||||
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n2)
|
||||
|
||||
with pytest.raises(NodeNotFoundError):
|
||||
result = g.get_node('2.1')
|
||||
result = g.get_node("2.1")
|
||||
|
||||
|
||||
def test_graph_gets_networkx_graph():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e = create_edge(n1.id,"image",n2.id,"image")
|
||||
e = create_edge(n1.id, "image", n2.id, "image")
|
||||
g.add_edge(e)
|
||||
|
||||
nxg = g.nx_graph()
|
||||
|
||||
assert '1' in nxg.nodes
|
||||
assert '2' in nxg.nodes
|
||||
assert ('1','2') in nxg.edges
|
||||
assert "1" in nxg.nodes
|
||||
assert "2" in nxg.nodes
|
||||
assert ("1", "2") in nxg.edges
|
||||
|
||||
|
||||
# TODO: Graph serializes and deserializes
|
||||
def test_graph_can_serialize():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e = create_edge(n1.id,"image",n2.id,"image")
|
||||
e = create_edge(n1.id, "image", n2.id, "image")
|
||||
g.add_edge(e)
|
||||
|
||||
# Not throwing on this line is sufficient
|
||||
json = g.json()
|
||||
|
||||
|
||||
def test_graph_can_deserialize():
|
||||
g = Graph()
|
||||
n1 = TextToImageTestInvocation(id = "1", prompt = "Banana sushi")
|
||||
n2 = ESRGANInvocation(id = "2")
|
||||
n1 = TextToImageTestInvocation(id="1", prompt="Banana sushi")
|
||||
n2 = ESRGANInvocation(id="2")
|
||||
g.add_node(n1)
|
||||
g.add_node(n2)
|
||||
e = create_edge(n1.id,"image",n2.id,"image")
|
||||
e = create_edge(n1.id, "image", n2.id, "image")
|
||||
g.add_edge(e)
|
||||
|
||||
json = g.json()
|
||||
g2 = Graph.parse_raw(json)
|
||||
|
||||
assert g2 is not None
|
||||
assert g2.nodes['1'] is not None
|
||||
assert g2.nodes['2'] is not None
|
||||
assert g2.nodes["1"] is not None
|
||||
assert g2.nodes["2"] is not None
|
||||
assert len(g2.edges) == 1
|
||||
assert g2.edges[0].source.node_id == '1'
|
||||
assert g2.edges[0].source.field == 'image'
|
||||
assert g2.edges[0].destination.node_id == '2'
|
||||
assert g2.edges[0].destination.field == 'image'
|
||||
assert g2.edges[0].source.node_id == "1"
|
||||
assert g2.edges[0].source.field == "image"
|
||||
assert g2.edges[0].destination.node_id == "2"
|
||||
assert g2.edges[0].destination.field == "image"
|
||||
|
||||
|
||||
def test_graph_can_generate_schema():
|
||||
# Not throwing on this line is sufficient
|
||||
# NOTE: if this test fails, it's PROBABLY because a new invocation type is breaking schema generation
|
||||
schema = Graph.schema_json(indent = 2)
|
||||
schema = Graph.schema_json(indent=2)
|
||||
|
@ -5,79 +5,92 @@ from invokeai.app.services.invocation_services import InvocationServices
|
||||
from pydantic import Field
|
||||
import pytest
|
||||
|
||||
|
||||
# Define test invocations before importing anything that uses invocations
|
||||
class ListPassThroughInvocationOutput(BaseInvocationOutput):
|
||||
type: Literal['test_list_output'] = 'test_list_output'
|
||||
type: Literal["test_list_output"] = "test_list_output"
|
||||
|
||||
collection: list[ImageField] = Field(default_factory=list)
|
||||
|
||||
|
||||
class ListPassThroughInvocation(BaseInvocation):
|
||||
type: Literal['test_list'] = 'test_list'
|
||||
type: Literal["test_list"] = "test_list"
|
||||
|
||||
collection: list[ImageField] = Field(default_factory=list)
|
||||
|
||||
def invoke(self, context: InvocationContext) -> ListPassThroughInvocationOutput:
|
||||
return ListPassThroughInvocationOutput(collection = self.collection)
|
||||
return ListPassThroughInvocationOutput(collection=self.collection)
|
||||
|
||||
|
||||
class PromptTestInvocationOutput(BaseInvocationOutput):
|
||||
type: Literal['test_prompt_output'] = 'test_prompt_output'
|
||||
type: Literal["test_prompt_output"] = "test_prompt_output"
|
||||
|
||||
prompt: str = Field(default="")
|
||||
|
||||
prompt: str = Field(default = "")
|
||||
|
||||
class PromptTestInvocation(BaseInvocation):
|
||||
type: Literal['test_prompt'] = 'test_prompt'
|
||||
type: Literal["test_prompt"] = "test_prompt"
|
||||
|
||||
prompt: str = Field(default = "")
|
||||
prompt: str = Field(default="")
|
||||
|
||||
def invoke(self, context: InvocationContext) -> PromptTestInvocationOutput:
|
||||
return PromptTestInvocationOutput(prompt = self.prompt)
|
||||
return PromptTestInvocationOutput(prompt=self.prompt)
|
||||
|
||||
|
||||
class ErrorInvocation(BaseInvocation):
|
||||
type: Literal['test_error'] = 'test_error'
|
||||
type: Literal["test_error"] = "test_error"
|
||||
|
||||
def invoke(self, context: InvocationContext) -> PromptTestInvocationOutput:
|
||||
raise Exception("This invocation is supposed to fail")
|
||||
|
||||
|
||||
class ImageTestInvocationOutput(BaseInvocationOutput):
|
||||
type: Literal['test_image_output'] = 'test_image_output'
|
||||
type: Literal["test_image_output"] = "test_image_output"
|
||||
|
||||
image: ImageField = Field()
|
||||
|
||||
class TextToImageTestInvocation(BaseInvocation):
|
||||
type: Literal['test_text_to_image'] = 'test_text_to_image'
|
||||
|
||||
prompt: str = Field(default = "")
|
||||
class TextToImageTestInvocation(BaseInvocation):
|
||||
type: Literal["test_text_to_image"] = "test_text_to_image"
|
||||
|
||||
prompt: str = Field(default="")
|
||||
|
||||
def invoke(self, context: InvocationContext) -> ImageTestInvocationOutput:
|
||||
return ImageTestInvocationOutput(image=ImageField(image_name=self.id))
|
||||
|
||||
class ImageToImageTestInvocation(BaseInvocation):
|
||||
type: Literal['test_image_to_image'] = 'test_image_to_image'
|
||||
|
||||
prompt: str = Field(default = "")
|
||||
class ImageToImageTestInvocation(BaseInvocation):
|
||||
type: Literal["test_image_to_image"] = "test_image_to_image"
|
||||
|
||||
prompt: str = Field(default="")
|
||||
image: Union[ImageField, None] = Field(default=None)
|
||||
|
||||
def invoke(self, context: InvocationContext) -> ImageTestInvocationOutput:
|
||||
return ImageTestInvocationOutput(image=ImageField(image_name=self.id))
|
||||
|
||||
|
||||
class PromptCollectionTestInvocationOutput(BaseInvocationOutput):
|
||||
type: Literal['test_prompt_collection_output'] = 'test_prompt_collection_output'
|
||||
type: Literal["test_prompt_collection_output"] = "test_prompt_collection_output"
|
||||
collection: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class PromptCollectionTestInvocation(BaseInvocation):
|
||||
type: Literal['test_prompt_collection'] = 'test_prompt_collection'
|
||||
type: Literal["test_prompt_collection"] = "test_prompt_collection"
|
||||
collection: list[str] = Field()
|
||||
|
||||
def invoke(self, context: InvocationContext) -> PromptCollectionTestInvocationOutput:
|
||||
return PromptCollectionTestInvocationOutput(collection=self.collection.copy())
|
||||
|
||||
|
||||
from invokeai.app.services.events import EventServiceBase
|
||||
from invokeai.app.services.graph import Edge, EdgeConnection
|
||||
|
||||
|
||||
def create_edge(from_id: str, from_field: str, to_id: str, to_field: str) -> Edge:
|
||||
return Edge(
|
||||
source=EdgeConnection(node_id = from_id, field = from_field),
|
||||
destination=EdgeConnection(node_id = to_id, field = to_field))
|
||||
source=EdgeConnection(node_id=from_id, field=from_field),
|
||||
destination=EdgeConnection(node_id=to_id, field=to_field),
|
||||
)
|
||||
|
||||
|
||||
class TestEvent:
|
||||
@ -88,6 +101,7 @@ class TestEvent:
|
||||
self.event_name = event_name
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class TestEventService(EventServiceBase):
|
||||
events: list
|
||||
|
||||
@ -98,8 +112,10 @@ class TestEventService(EventServiceBase):
|
||||
def dispatch(self, event_name: str, payload: Any) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def wait_until(condition: Callable[[], bool], timeout: int = 10, interval: float = 0.1) -> None:
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
if condition():
|
||||
|
@ -3,110 +3,131 @@ from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class TestModel(BaseModel):
|
||||
id: str = Field(description = "ID")
|
||||
name: str = Field(description = "Name")
|
||||
id: str = Field(description="ID")
|
||||
name: str = Field(description="Name")
|
||||
|
||||
|
||||
def test_sqlite_service_can_create_and_get():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
assert db.get('1') == TestModel(id = '1', name = 'Test')
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
assert db.get("1") == TestModel(id="1", name="Test")
|
||||
|
||||
|
||||
def test_sqlite_service_can_list():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id = '2', name = 'Test'))
|
||||
db.set(TestModel(id = '3', name = 'Test'))
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.set(TestModel(id="2", name="Test"))
|
||||
db.set(TestModel(id="3", name="Test"))
|
||||
results = db.list()
|
||||
assert results.page == 0
|
||||
assert results.pages == 1
|
||||
assert results.per_page == 10
|
||||
assert results.total == 3
|
||||
assert results.items == [TestModel(id = '1', name = 'Test'), TestModel(id = '2', name = 'Test'), TestModel(id = '3', name = 'Test')]
|
||||
assert results.items == [
|
||||
TestModel(id="1", name="Test"),
|
||||
TestModel(id="2", name="Test"),
|
||||
TestModel(id="3", name="Test"),
|
||||
]
|
||||
|
||||
|
||||
def test_sqlite_service_can_delete():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.delete('1')
|
||||
assert db.get('1') is None
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.delete("1")
|
||||
assert db.get("1") is None
|
||||
|
||||
|
||||
def test_sqlite_service_calls_set_callback():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
called = False
|
||||
|
||||
def on_changed(item: TestModel):
|
||||
nonlocal called
|
||||
called = True
|
||||
|
||||
db.on_changed(on_changed)
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
assert called
|
||||
|
||||
|
||||
def test_sqlite_service_calls_delete_callback():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
called = False
|
||||
|
||||
def on_deleted(item_id: str):
|
||||
nonlocal called
|
||||
called = True
|
||||
|
||||
db.on_deleted(on_deleted)
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.delete('1')
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.delete("1")
|
||||
assert called
|
||||
|
||||
|
||||
def test_sqlite_service_can_list_with_pagination():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id = '2', name = 'Test'))
|
||||
db.set(TestModel(id = '3', name = 'Test'))
|
||||
results = db.list(page = 0, per_page = 2)
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.set(TestModel(id="2", name="Test"))
|
||||
db.set(TestModel(id="3", name="Test"))
|
||||
results = db.list(page=0, per_page=2)
|
||||
assert results.page == 0
|
||||
assert results.pages == 2
|
||||
assert results.per_page == 2
|
||||
assert results.total == 3
|
||||
assert results.items == [TestModel(id = '1', name = 'Test'), TestModel(id = '2', name = 'Test')]
|
||||
assert results.items == [TestModel(id="1", name="Test"), TestModel(id="2", name="Test")]
|
||||
|
||||
|
||||
def test_sqlite_service_can_list_with_pagination_and_offset():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id = '2', name = 'Test'))
|
||||
db.set(TestModel(id = '3', name = 'Test'))
|
||||
results = db.list(page = 1, per_page = 2)
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.set(TestModel(id="2", name="Test"))
|
||||
db.set(TestModel(id="3", name="Test"))
|
||||
results = db.list(page=1, per_page=2)
|
||||
assert results.page == 1
|
||||
assert results.pages == 2
|
||||
assert results.per_page == 2
|
||||
assert results.total == 3
|
||||
assert results.items == [TestModel(id = '3', name = 'Test')]
|
||||
assert results.items == [TestModel(id="3", name="Test")]
|
||||
|
||||
|
||||
def test_sqlite_service_can_search():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id = '2', name = 'Test'))
|
||||
db.set(TestModel(id = '3', name = 'Test'))
|
||||
results = db.search(query = 'Test')
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.set(TestModel(id="2", name="Test"))
|
||||
db.set(TestModel(id="3", name="Test"))
|
||||
results = db.search(query="Test")
|
||||
assert results.page == 0
|
||||
assert results.pages == 1
|
||||
assert results.per_page == 10
|
||||
assert results.total == 3
|
||||
assert results.items == [TestModel(id = '1', name = 'Test'), TestModel(id = '2', name = 'Test'), TestModel(id = '3', name = 'Test')]
|
||||
assert results.items == [
|
||||
TestModel(id="1", name="Test"),
|
||||
TestModel(id="2", name="Test"),
|
||||
TestModel(id="3", name="Test"),
|
||||
]
|
||||
|
||||
|
||||
def test_sqlite_service_can_search_with_pagination():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id = '2', name = 'Test'))
|
||||
db.set(TestModel(id = '3', name = 'Test'))
|
||||
results = db.search(query = 'Test', page = 0, per_page = 2)
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.set(TestModel(id="2", name="Test"))
|
||||
db.set(TestModel(id="3", name="Test"))
|
||||
results = db.search(query="Test", page=0, per_page=2)
|
||||
assert results.page == 0
|
||||
assert results.pages == 2
|
||||
assert results.per_page == 2
|
||||
assert results.total == 3
|
||||
assert results.items == [TestModel(id = '1', name = 'Test'), TestModel(id = '2', name = 'Test')]
|
||||
assert results.items == [TestModel(id="1", name="Test"), TestModel(id="2", name="Test")]
|
||||
|
||||
|
||||
def test_sqlite_service_can_search_with_pagination_and_offset():
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, 'test', 'id')
|
||||
db.set(TestModel(id = '1', name = 'Test'))
|
||||
db.set(TestModel(id = '2', name = 'Test'))
|
||||
db.set(TestModel(id = '3', name = 'Test'))
|
||||
results = db.search(query = 'Test', page = 1, per_page = 2)
|
||||
db = SqliteItemStorage[TestModel](sqlite_memory, "test", "id")
|
||||
db.set(TestModel(id="1", name="Test"))
|
||||
db.set(TestModel(id="2", name="Test"))
|
||||
db.set(TestModel(id="3", name="Test"))
|
||||
results = db.search(query="Test", page=1, per_page=2)
|
||||
assert results.page == 1
|
||||
assert results.pages == 2
|
||||
assert results.per_page == 2
|
||||
assert results.total == 3
|
||||
assert results.items == [TestModel(id = '3', name = 'Test')]
|
||||
assert results.items == [TestModel(id="3", name="Test")]
|
||||
|
@ -5,87 +5,91 @@ import sys
|
||||
from omegaconf import OmegaConf
|
||||
from pathlib import Path
|
||||
|
||||
os.environ['INVOKEAI_ROOT']='/tmp'
|
||||
os.environ["INVOKEAI_ROOT"] = "/tmp"
|
||||
|
||||
from invokeai.app.services.config import InvokeAIAppConfig
|
||||
|
||||
init1 = OmegaConf.create(
|
||||
'''
|
||||
"""
|
||||
InvokeAI:
|
||||
Features:
|
||||
always_use_cpu: false
|
||||
Memory/Performance:
|
||||
max_cache_size: 5
|
||||
tiled_decode: false
|
||||
'''
|
||||
"""
|
||||
)
|
||||
|
||||
init2 = OmegaConf.create(
|
||||
'''
|
||||
"""
|
||||
InvokeAI:
|
||||
Features:
|
||||
always_use_cpu: true
|
||||
Memory/Performance:
|
||||
max_cache_size: 2
|
||||
tiled_decode: true
|
||||
'''
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def test_use_init():
|
||||
# note that we explicitly set omegaconf dict and argv here
|
||||
# so that the values aren't read from ~invokeai/invokeai.yaml and
|
||||
# sys.argv respectively.
|
||||
conf1 = InvokeAIAppConfig.get_config()
|
||||
assert conf1
|
||||
conf1.parse_args(conf=init1,argv=[])
|
||||
conf1.parse_args(conf=init1, argv=[])
|
||||
assert not conf1.tiled_decode
|
||||
assert conf1.max_cache_size==5
|
||||
assert conf1.max_cache_size == 5
|
||||
assert not conf1.always_use_cpu
|
||||
|
||||
conf2 = InvokeAIAppConfig.get_config()
|
||||
assert conf2
|
||||
conf2.parse_args(conf=init2,argv=[])
|
||||
conf2.parse_args(conf=init2, argv=[])
|
||||
assert conf2.tiled_decode
|
||||
assert conf2.max_cache_size==2
|
||||
assert not hasattr(conf2,'invalid_attribute')
|
||||
|
||||
assert conf2.max_cache_size == 2
|
||||
assert not hasattr(conf2, "invalid_attribute")
|
||||
|
||||
|
||||
def test_argv_override():
|
||||
conf = InvokeAIAppConfig.get_config()
|
||||
conf.parse_args(conf=init1,argv=['--always_use_cpu','--max_cache=10'])
|
||||
conf.parse_args(conf=init1, argv=["--always_use_cpu", "--max_cache=10"])
|
||||
assert conf.always_use_cpu
|
||||
assert conf.max_cache_size==10
|
||||
assert conf.outdir==Path('outputs') # this is the default
|
||||
|
||||
assert conf.max_cache_size == 10
|
||||
assert conf.outdir == Path("outputs") # this is the default
|
||||
|
||||
|
||||
def test_env_override():
|
||||
# argv overrides
|
||||
# argv overrides
|
||||
conf = InvokeAIAppConfig()
|
||||
conf.parse_args(conf=init1,argv=['--max_cache=10'])
|
||||
assert conf.always_use_cpu==False
|
||||
os.environ['INVOKEAI_always_use_cpu'] = 'True'
|
||||
conf.parse_args(conf=init1,argv=['--max_cache=10'])
|
||||
assert conf.always_use_cpu==True
|
||||
conf.parse_args(conf=init1, argv=["--max_cache=10"])
|
||||
assert conf.always_use_cpu == False
|
||||
os.environ["INVOKEAI_always_use_cpu"] = "True"
|
||||
conf.parse_args(conf=init1, argv=["--max_cache=10"])
|
||||
assert conf.always_use_cpu == True
|
||||
|
||||
# environment variables should be case insensitive
|
||||
os.environ['InvokeAI_Max_Cache_Size'] = '15'
|
||||
os.environ["InvokeAI_Max_Cache_Size"] = "15"
|
||||
conf = InvokeAIAppConfig()
|
||||
conf.parse_args(conf=init1,argv=[])
|
||||
conf.parse_args(conf=init1, argv=[])
|
||||
assert conf.max_cache_size == 15
|
||||
|
||||
conf = InvokeAIAppConfig()
|
||||
conf.parse_args(conf=init1,argv=['--no-always_use_cpu','--max_cache=10'])
|
||||
assert conf.always_use_cpu==False
|
||||
assert conf.max_cache_size==10
|
||||
conf.parse_args(conf=init1, argv=["--no-always_use_cpu", "--max_cache=10"])
|
||||
assert conf.always_use_cpu == False
|
||||
assert conf.max_cache_size == 10
|
||||
|
||||
conf = InvokeAIAppConfig.get_config(max_cache_size=20)
|
||||
conf.parse_args(conf=init1,argv=[])
|
||||
assert conf.max_cache_size==20
|
||||
conf.parse_args(conf=init1, argv=[])
|
||||
assert conf.max_cache_size == 20
|
||||
|
||||
|
||||
def test_type_coercion():
|
||||
conf = InvokeAIAppConfig().get_config()
|
||||
conf.parse_args(argv=['--root=/tmp/foobar'])
|
||||
assert conf.root==Path('/tmp/foobar')
|
||||
assert isinstance(conf.root,Path)
|
||||
conf = InvokeAIAppConfig.get_config(root='/tmp/different')
|
||||
conf.parse_args(argv=['--root=/tmp/foobar'])
|
||||
assert conf.root==Path('/tmp/different')
|
||||
assert isinstance(conf.root,Path)
|
||||
conf.parse_args(argv=["--root=/tmp/foobar"])
|
||||
assert conf.root == Path("/tmp/foobar")
|
||||
assert isinstance(conf.root, Path)
|
||||
conf = InvokeAIAppConfig.get_config(root="/tmp/different")
|
||||
conf.parse_args(argv=["--root=/tmp/foobar"])
|
||||
assert conf.root == Path("/tmp/different")
|
||||
assert isinstance(conf.root, Path)
|
||||
|
@ -11,6 +11,7 @@ import invokeai.frontend.web.dist as frontend
|
||||
import invokeai.configs as configs
|
||||
import invokeai.app.assets.images as image_assets
|
||||
|
||||
|
||||
class ConfigsTestCase(unittest.TestCase):
|
||||
"""Test the configuration related imports and objects"""
|
||||
|
||||
|
Reference in New Issue
Block a user