From 88e16ce051a4eb6d3692b6b85570ab656aa06593 Mon Sep 17 00:00:00 2001 From: psychedelicious <4822129+psychedelicious@users.noreply.github.com> Date: Tue, 3 Oct 2023 23:35:06 +1100 Subject: [PATCH] fix(nodes): mark session queue items failed on processor error When the processor has an error and it has a queue item, mark that item failed. This addresses processor errors resulting in `in_progress` queue items, which create a soft lock of the processor, requiring the user to cancel the `in_progress` item before anything else processes. --- .../services/session_processor/session_processor_default.py | 5 +++++ invokeai/app/services/session_queue/session_queue_base.py | 2 +- invokeai/app/services/session_queue/session_queue_sqlite.py | 5 +++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index d5d08cc779..065b80e1a9 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -1,3 +1,4 @@ +import traceback from threading import BoundedSemaphore from threading import Event as ThreadEvent from threading import Thread @@ -123,6 +124,10 @@ class DefaultSessionProcessor(SessionProcessorBase): continue except Exception as e: self.__invoker.services.logger.error(f"Error in session processor: {e}") + if queue_item is not None: + self.__invoker.services.session_queue.cancel_queue_item( + queue_item.item_id, error=traceback.format_exc() + ) poll_now_event.wait(POLLING_INTERVAL) continue except Exception as e: diff --git a/invokeai/app/services/session_queue/session_queue_base.py b/invokeai/app/services/session_queue/session_queue_base.py index 0961759baa..cbc16f8283 100644 --- a/invokeai/app/services/session_queue/session_queue_base.py +++ b/invokeai/app/services/session_queue/session_queue_base.py @@ -80,7 +80,7 @@ class SessionQueueBase(ABC): pass @abstractmethod - def cancel_queue_item(self, item_id: int) -> SessionQueueItem: + def cancel_queue_item(self, item_id: int, error: Optional[str] = None) -> SessionQueueItem: """Cancels a session queue item""" pass diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index a13e13050e..19a5346e10 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -555,10 +555,11 @@ class SqliteSessionQueue(SessionQueueBase): self.__lock.release() return PruneResult(deleted=count) - def cancel_queue_item(self, item_id: int) -> SessionQueueItem: + def cancel_queue_item(self, item_id: int, error: Optional[str] = None) -> SessionQueueItem: queue_item = self.get_queue_item(item_id) if queue_item.status not in ["canceled", "failed", "completed"]: - queue_item = self._set_queue_item_status(item_id=item_id, status="canceled") + status = "failed" if error is not None else "canceled" + queue_item = self._set_queue_item_status(item_id=item_id, status=status, error=error) self.__invoker.services.queue.cancel(queue_item.session_id) self.__invoker.services.events.emit_session_canceled( queue_item_id=queue_item.item_id,