From 883e9973ece161ae46833ba9c45dbf3e4ca12a4e Mon Sep 17 00:00:00 2001 From: Brandon Rising Date: Thu, 21 Sep 2023 11:10:25 -0400 Subject: [PATCH 1/2] When an exception happens within the session processor loop, record and move on --- .../session_processor_default.py | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index b682c7e56c..5f493aa0e2 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -92,30 +92,33 @@ class DefaultSessionProcessor(SessionProcessorBase): self.__invoker.services.logger while not stop_event.is_set(): poll_now_event.clear() + try: + # do not dequeue if there is already a session running + if self.__queue_item is None and resume_event.is_set(): + queue_item = self.__invoker.services.session_queue.dequeue() - # do not dequeue if there is already a session running - if self.__queue_item is None and resume_event.is_set(): - queue_item = self.__invoker.services.session_queue.dequeue() + if queue_item is not None: + self.__invoker.services.logger.debug(f"Executing queue item {queue_item.item_id}") + self.__queue_item = queue_item + self.__invoker.services.graph_execution_manager.set(queue_item.session) + self.__invoker.invoke( + session_queue_batch_id=queue_item.batch_id, + session_queue_id=queue_item.queue_id, + session_queue_item_id=queue_item.item_id, + graph_execution_state=queue_item.session, + invoke_all=True, + ) + queue_item = None - if queue_item is not None: - self.__invoker.services.logger.debug(f"Executing queue item {queue_item.item_id}") - self.__queue_item = queue_item - self.__invoker.services.graph_execution_manager.set(queue_item.session) - self.__invoker.invoke( - session_queue_batch_id=queue_item.batch_id, - session_queue_id=queue_item.queue_id, - session_queue_item_id=queue_item.item_id, - graph_execution_state=queue_item.session, - invoke_all=True, - ) - queue_item = None - - if queue_item is None: - self.__invoker.services.logger.debug("Waiting for next polling interval or event") - poll_now_event.wait(POLLING_INTERVAL) + if queue_item is None: + self.__invoker.services.logger.debug("Waiting for next polling interval or event") + poll_now_event.wait(POLLING_INTERVAL) + continue + except Exception as e: + self.__invoker.services.logger.error(f"Error in session processor: {e}") continue except Exception as e: - self.__invoker.services.logger.error(f"Error in session processor: {e}") + self.__invoker.services.logger.error(f"Fatal Error in session processor: {e}") pass finally: stop_event.clear() From 6cc7b55ec543127450e025e34b6ea533801f615b Mon Sep 17 00:00:00 2001 From: Brandon Rising Date: Thu, 21 Sep 2023 11:18:57 -0400 Subject: [PATCH 2/2] Add wait on exception --- .../app/services/session_processor/session_processor_default.py | 1 + 1 file changed, 1 insertion(+) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index 5f493aa0e2..fbe754b897 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -116,6 +116,7 @@ class DefaultSessionProcessor(SessionProcessorBase): continue except Exception as e: self.__invoker.services.logger.error(f"Error in session processor: {e}") + poll_now_event.wait(POLLING_INTERVAL) continue except Exception as e: self.__invoker.services.logger.error(f"Fatal Error in session processor: {e}")