fix(processor): fix race condition related to clearing the queue

This commit is contained in:
psychedelicious 2024-05-24 11:26:34 +10:00
parent 169b75b2b7
commit 350feeed56

View File

@ -19,7 +19,7 @@ from invokeai.app.services.session_processor.session_processor_base import (
OnNonFatalProcessorError, OnNonFatalProcessorError,
) )
from invokeai.app.services.session_processor.session_processor_common import CanceledException from invokeai.app.services.session_processor.session_processor_common import CanceledException
from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem, SessionQueueItemNotFoundError
from invokeai.app.services.shared.graph import NodeInputError from invokeai.app.services.shared.graph import NodeInputError
from invokeai.app.services.shared.invocation_context import InvocationContextData, build_invocation_context from invokeai.app.services.shared.invocation_context import InvocationContextData, build_invocation_context
from invokeai.app.util.profiler import Profiler from invokeai.app.util.profiler import Profiler
@ -166,8 +166,11 @@ class DefaultSessionRunner(SessionRunnerBase):
graph_execution_state_id=queue_item.session.id, output_path=stats_path graph_execution_state_id=queue_item.session.id, output_path=stats_path
) )
# Update the queue item with the completed session try:
self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session) # Update the queue item with the completed session. If the queue item has been removed from the queue,
# we'll get a SessionQueueItemNotFoundError and we can ignore it. This can happen if the queue is cleared
# while the session is running.
queue_item = self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session)
# TODO(psyche): This feels jumbled - we should review separation of concerns here. # TODO(psyche): This feels jumbled - we should review separation of concerns here.
# Send complete event. The events service will receive this and update the queue item's status. # Send complete event. The events service will receive this and update the queue item's status.
@ -186,6 +189,8 @@ class DefaultSessionRunner(SessionRunnerBase):
for callback in self._on_after_run_session_callbacks: for callback in self._on_after_run_session_callbacks:
callback(queue_item=queue_item) callback(queue_item=queue_item)
except SessionQueueItemNotFoundError:
pass
def _on_before_run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem): def _on_before_run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem):
"""Run before a node is executed""" """Run before a node is executed"""
@ -349,6 +354,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
"failed", "failed",
"canceled", "canceled",
]: ]:
self._cancel_event.set()
self._poll_now() self._poll_now()
def resume(self) -> SessionProcessorStatus: def resume(self) -> SessionProcessorStatus: