Merge branch 'main' into fix/nodes/selective-cache-invalidation

This commit is contained in:
Brandon Rising
2023-09-20 10:41:23 -04:00
58 changed files with 338 additions and 412 deletions

View File

@ -425,12 +425,21 @@ class InvocationContext:
graph_execution_state_id: str
queue_id: str
queue_item_id: int
queue_batch_id: str
def __init__(self, services: InvocationServices, queue_id: str, queue_item_id: int, graph_execution_state_id: str):
def __init__(
self,
services: InvocationServices,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
):
self.services = services
self.graph_execution_state_id = graph_execution_state_id
self.queue_id = queue_id
self.queue_item_id = queue_item_id
self.queue_batch_id = queue_batch_id
class BaseInvocationOutput(BaseModel):

View File

@ -30,6 +30,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
node: dict,
source_node_id: str,
@ -44,6 +45,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
node_id=node.get("id"),
source_node_id=source_node_id,
@ -58,6 +60,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
result: dict,
node: dict,
@ -69,6 +72,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
@ -80,6 +84,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
node: dict,
source_node_id: str,
@ -92,6 +97,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
@ -101,7 +107,13 @@ class EventServiceBase:
)
def emit_invocation_started(
self, queue_id: str, queue_item_id: int, graph_execution_state_id: str, node: dict, source_node_id: str
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
node: dict,
source_node_id: str,
) -> None:
"""Emitted when an invocation has started"""
self.__emit_queue_event(
@ -109,19 +121,23 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
),
)
def emit_graph_execution_complete(self, queue_id: str, queue_item_id: int, graph_execution_state_id: str) -> None:
def emit_graph_execution_complete(
self, queue_id: str, queue_item_id: int, queue_batch_id: str, graph_execution_state_id: str
) -> None:
"""Emitted when a session has completed all invocations"""
self.__emit_queue_event(
event_name="graph_execution_state_complete",
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
),
)
@ -130,6 +146,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
model_name: str,
base_model: BaseModelType,
@ -142,6 +159,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
model_name=model_name,
base_model=base_model,
@ -154,6 +172,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
model_name: str,
base_model: BaseModelType,
@ -167,6 +186,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
model_name=model_name,
base_model=base_model,
@ -182,6 +202,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
error_type: str,
error: str,
@ -192,6 +213,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
error_type=error_type,
error=error,
@ -202,6 +224,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
node_id: str,
error_type: str,
@ -213,6 +236,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
node_id=node_id,
error_type=error_type,
@ -224,6 +248,7 @@ class EventServiceBase:
self,
queue_id: str,
queue_item_id: int,
queue_batch_id: str,
graph_execution_state_id: str,
) -> None:
"""Emitted when a session is canceled"""
@ -232,6 +257,7 @@ class EventServiceBase:
payload=dict(
queue_id=queue_id,
queue_item_id=queue_item_id,
queue_batch_id=queue_batch_id,
graph_execution_state_id=graph_execution_state_id,
),
)

View File

@ -15,6 +15,9 @@ class InvocationQueueItem(BaseModel):
session_queue_item_id: int = Field(
description="The ID of session queue item from which this invocation queue item came"
)
session_queue_batch_id: str = Field(
description="The ID of the session batch from which this invocation queue item came"
)
invoke_all: bool = Field(default=False)
timestamp: float = Field(default_factory=time.time)

View File

@ -18,7 +18,12 @@ class Invoker:
self._start()
def invoke(
self, queue_id: str, queue_item_id: int, graph_execution_state: GraphExecutionState, invoke_all: bool = False
self,
session_queue_id: str,
session_queue_item_id: int,
session_queue_batch_id: str,
graph_execution_state: GraphExecutionState,
invoke_all: bool = False,
) -> Optional[str]:
"""Determines the next node to invoke and enqueues it, preparing if needed.
Returns the id of the queued node, or `None` if there are no nodes left to enqueue."""
@ -34,8 +39,9 @@ class Invoker:
# Queue the invocation
self.services.queue.put(
InvocationQueueItem(
session_queue_item_id=queue_item_id,
session_queue_id=queue_id,
session_queue_id=session_queue_id,
session_queue_item_id=session_queue_item_id,
session_queue_batch_id=session_queue_batch_id,
graph_execution_state_id=graph_execution_state.id,
invocation_id=invocation.id,
invoke_all=invoke_all,

View File

@ -539,6 +539,7 @@ class ModelManagerService(ModelManagerServiceBase):
context.services.events.emit_model_load_completed(
queue_id=context.queue_id,
queue_item_id=context.queue_item_id,
queue_batch_id=context.queue_batch_id,
graph_execution_state_id=context.graph_execution_state_id,
model_name=model_name,
base_model=base_model,
@ -550,6 +551,7 @@ class ModelManagerService(ModelManagerServiceBase):
context.services.events.emit_model_load_started(
queue_id=context.queue_id,
queue_item_id=context.queue_item_id,
queue_batch_id=context.queue_batch_id,
graph_execution_state_id=context.graph_execution_state_id,
model_name=model_name,
base_model=base_model,

View File

@ -57,6 +57,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
except Exception as e:
self.__invoker.services.logger.error("Exception while retrieving session:\n%s" % e)
self.__invoker.services.events.emit_session_retrieval_error(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=queue_item.graph_execution_state_id,
@ -70,6 +71,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
except Exception as e:
self.__invoker.services.logger.error("Exception while retrieving invocation:\n%s" % e)
self.__invoker.services.events.emit_invocation_retrieval_error(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=queue_item.graph_execution_state_id,
@ -84,6 +86,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
# Send starting event
self.__invoker.services.events.emit_invocation_started(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,
@ -106,6 +109,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
graph_execution_state_id=graph_execution_state.id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
queue_batch_id=queue_item.session_queue_batch_id,
)
)
@ -121,6 +125,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
# Send complete event
self.__invoker.services.events.emit_invocation_complete(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,
@ -150,6 +155,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
self.__invoker.services.logger.error("Error while invoking:\n%s" % e)
# Send error event
self.__invoker.services.events.emit_invocation_error(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,
@ -170,14 +176,16 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
if queue_item.invoke_all and not is_complete:
try:
self.__invoker.invoke(
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
session_queue_batch_id=queue_item.session_queue_batch_id,
session_queue_item_id=queue_item.session_queue_item_id,
session_queue_id=queue_item.session_queue_id,
graph_execution_state=graph_execution_state,
invoke_all=True,
)
except Exception as e:
self.__invoker.services.logger.error("Error while invoking:\n%s" % e)
self.__invoker.services.events.emit_invocation_error(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,
@ -188,6 +196,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
)
elif is_complete:
self.__invoker.services.events.emit_graph_execution_complete(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,

View File

@ -102,8 +102,9 @@ class DefaultSessionProcessor(SessionProcessorBase):
self.__queue_item = queue_item
self.__invoker.services.graph_execution_manager.set(queue_item.session)
self.__invoker.invoke(
queue_item_id=queue_item.item_id,
queue_id=queue_item.queue_id,
session_queue_batch_id=queue_item.batch_id,
session_queue_id=queue_item.queue_id,
session_queue_item_id=queue_item.item_id,
graph_execution_state=queue_item.session,
invoke_all=True,
)

View File

@ -562,6 +562,7 @@ class SqliteSessionQueue(SessionQueueBase):
self.__invoker.services.events.emit_session_canceled(
queue_item_id=queue_item.item_id,
queue_id=queue_item.queue_id,
queue_batch_id=queue_item.batch_id,
graph_execution_state_id=queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
@ -604,6 +605,7 @@ class SqliteSessionQueue(SessionQueueBase):
self.__invoker.services.events.emit_session_canceled(
queue_item_id=current_queue_item.item_id,
queue_id=current_queue_item.queue_id,
queue_batch_id=current_queue_item.batch_id,
graph_execution_state_id=current_queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(current_queue_item)
@ -649,6 +651,7 @@ class SqliteSessionQueue(SessionQueueBase):
self.__invoker.services.events.emit_session_canceled(
queue_item_id=current_queue_item.item_id,
queue_id=current_queue_item.queue_id,
queue_batch_id=current_queue_item.batch_id,
graph_execution_state_id=current_queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(current_queue_item)

View File

@ -112,6 +112,7 @@ def stable_diffusion_step_callback(
context.services.events.emit_generator_progress(
queue_id=context.queue_id,
queue_item_id=context.queue_item_id,
queue_batch_id=context.queue_batch_id,
graph_execution_state_id=context.graph_execution_state_id,
node=node,
source_node_id=source_node_id,