From 350feeed566f4635ee7fad7fa7a856ac53df88a7 Mon Sep 17 00:00:00 2001 From: psychedelicious <4822129+psychedelicious@users.noreply.github.com> Date: Fri, 24 May 2024 11:26:34 +1000 Subject: [PATCH] fix(processor): fix race condition related to clearing the queue --- .../session_processor/session_processor_default.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index eec835af87..c87108e5a0 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -19,7 +19,7 @@ from invokeai.app.services.session_processor.session_processor_base import ( OnNonFatalProcessorError, ) from invokeai.app.services.session_processor.session_processor_common import CanceledException -from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem +from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem, SessionQueueItemNotFoundError from invokeai.app.services.shared.graph import NodeInputError from invokeai.app.services.shared.invocation_context import InvocationContextData, build_invocation_context from invokeai.app.util.profiler import Profiler @@ -166,8 +166,11 @@ class DefaultSessionRunner(SessionRunnerBase): graph_execution_state_id=queue_item.session.id, output_path=stats_path ) - # Update the queue item with the completed session - self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session) + try: + # Update the queue item with the completed session. If the queue item has been removed from the queue, + # we'll get a SessionQueueItemNotFoundError and we can ignore it. This can happen if the queue is cleared + # while the session is running. + queue_item = self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session) # TODO(psyche): This feels jumbled - we should review separation of concerns here. # Send complete event. The events service will receive this and update the queue item's status. @@ -186,6 +189,8 @@ class DefaultSessionRunner(SessionRunnerBase): for callback in self._on_after_run_session_callbacks: callback(queue_item=queue_item) + except SessionQueueItemNotFoundError: + pass def _on_before_run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem): """Run before a node is executed""" @@ -349,6 +354,7 @@ class DefaultSessionProcessor(SessionProcessorBase): "failed", "canceled", ]: + self._cancel_event.set() self._poll_now() def resume(self) -> SessionProcessorStatus: