From ad0bb3f61a6f832ef4d770a51e2339f334c52e9b Mon Sep 17 00:00:00 2001 From: Eugene Date: Tue, 9 May 2023 23:52:07 -0400 Subject: [PATCH] fix: queue error should not crash InvocationProcessor 1. if retrieving an item from the queue raises an exception, the InvocationProcessor thread crashes, but the API continues running in a non-functional state. This fixes the issue 2. when there are no items in the queue, sleep 1 second before checking again. 3. Also ensures the thread isn't crashed if an exception is raised from invoker, and emits the error event Intentionally using base Exceptions because for now we don't know which specific exception to expect. Fixes (sort of)? #3222 --- invokeai/app/services/processor.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/invokeai/app/services/processor.py b/invokeai/app/services/processor.py index 35cbcd5068..5d292af3c9 100644 --- a/invokeai/app/services/processor.py +++ b/invokeai/app/services/processor.py @@ -1,3 +1,4 @@ +import time import traceback from threading import Event, Thread, BoundedSemaphore @@ -6,6 +7,7 @@ from .invocation_queue import InvocationQueueItem from .invoker import InvocationProcessorABC, Invoker from ..models.exceptions import CanceledException +import invokeai.backend.util.logging as logger class DefaultInvocationProcessor(InvocationProcessorABC): __invoker_thread: Thread __stop_event: Event @@ -34,8 +36,14 @@ class DefaultInvocationProcessor(InvocationProcessorABC): try: self.__threadLimit.acquire() while not stop_event.is_set(): - queue_item: InvocationQueueItem = self.__invoker.services.queue.get() + try: + queue_item: InvocationQueueItem = self.__invoker.services.queue.get() + except Exception as e: + logger.debug("Exception while getting from queue: %s" % e) + if not queue_item: # Probably stopping + # do not hammer the queue + time.sleep(1) continue graph_execution_state = ( @@ -124,7 +132,16 @@ class DefaultInvocationProcessor(InvocationProcessorABC): # Queue any further commands if invoking all is_complete = graph_execution_state.is_complete() if queue_item.invoke_all and not is_complete: - self.__invoker.invoke(graph_execution_state, invoke_all=True) + try: + self.__invoker.invoke(graph_execution_state, invoke_all=True) + except Exception as e: + logger.error("Error while invoking: %s" % e) + self.__invoker.services.events.emit_invocation_error( + graph_execution_state_id=graph_execution_state.id, + node=invocation.dict(), + source_node_id=source_node_id, + error=traceback.format_exc() + ) elif is_complete: self.__invoker.services.events.emit_graph_execution_complete( graph_execution_state.id