When an exception happens within the session processor loop, record and move on

This commit is contained in:
Brandon Rising 2023-09-21 11:10:25 -04:00
parent 9e7d829906
commit 883e9973ec

View File

@ -92,30 +92,33 @@ class DefaultSessionProcessor(SessionProcessorBase):
self.__invoker.services.logger self.__invoker.services.logger
while not stop_event.is_set(): while not stop_event.is_set():
poll_now_event.clear() poll_now_event.clear()
try:
# do not dequeue if there is already a session running
if self.__queue_item is None and resume_event.is_set():
queue_item = self.__invoker.services.session_queue.dequeue()
# do not dequeue if there is already a session running if queue_item is not None:
if self.__queue_item is None and resume_event.is_set(): self.__invoker.services.logger.debug(f"Executing queue item {queue_item.item_id}")
queue_item = self.__invoker.services.session_queue.dequeue() self.__queue_item = queue_item
self.__invoker.services.graph_execution_manager.set(queue_item.session)
self.__invoker.invoke(
session_queue_batch_id=queue_item.batch_id,
session_queue_id=queue_item.queue_id,
session_queue_item_id=queue_item.item_id,
graph_execution_state=queue_item.session,
invoke_all=True,
)
queue_item = None
if queue_item is not None: if queue_item is None:
self.__invoker.services.logger.debug(f"Executing queue item {queue_item.item_id}") self.__invoker.services.logger.debug("Waiting for next polling interval or event")
self.__queue_item = queue_item poll_now_event.wait(POLLING_INTERVAL)
self.__invoker.services.graph_execution_manager.set(queue_item.session) continue
self.__invoker.invoke( except Exception as e:
session_queue_batch_id=queue_item.batch_id, self.__invoker.services.logger.error(f"Error in session processor: {e}")
session_queue_id=queue_item.queue_id,
session_queue_item_id=queue_item.item_id,
graph_execution_state=queue_item.session,
invoke_all=True,
)
queue_item = None
if queue_item is None:
self.__invoker.services.logger.debug("Waiting for next polling interval or event")
poll_now_event.wait(POLLING_INTERVAL)
continue continue
except Exception as e: except Exception as e:
self.__invoker.services.logger.error(f"Error in session processor: {e}") self.__invoker.services.logger.error(f"Fatal Error in session processor: {e}")
pass pass
finally: finally:
stop_event.clear() stop_event.clear()