From 9c926f249f679fbea0082c050974bac254d592e8 Mon Sep 17 00:00:00 2001 From: psychedelicious <4822129+psychedelicious@users.noreply.github.com> Date: Fri, 24 May 2024 11:28:55 +1000 Subject: [PATCH] feat(processor): add debug log stmts to session running callbacks --- .../session_processor_default.py | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index c87108e5a0..8cb9216821 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -148,6 +148,10 @@ class DefaultSessionRunner(SessionRunnerBase): def _on_before_run_session(self, queue_item: SessionQueueItem) -> None: """Run before a session is executed""" + self._services.logger.debug( + f"On before run session: queue item {queue_item.item_id}, session {queue_item.session_id}" + ) + # If profiling is enabled, start the profiler if self._profiler is not None: self._profiler.start(profile_id=queue_item.session_id) @@ -158,6 +162,10 @@ class DefaultSessionRunner(SessionRunnerBase): def _on_after_run_session(self, queue_item: SessionQueueItem) -> None: """Run after a session is executed""" + self._services.logger.debug( + f"On after run session: queue item {queue_item.item_id}, session {queue_item.session_id}" + ) + # If we are profiling, stop the profiler and dump the profile & stats if self._profiler is not None: profile_path = self._profiler.stop() @@ -172,29 +180,33 @@ class DefaultSessionRunner(SessionRunnerBase): # while the session is running. queue_item = self._services.session_queue.set_queue_item_session(queue_item.item_id, queue_item.session) - # TODO(psyche): This feels jumbled - we should review separation of concerns here. - # Send complete event. The events service will receive this and update the queue item's status. - self._services.events.emit_graph_execution_complete( - queue_batch_id=queue_item.batch_id, - queue_item_id=queue_item.item_id, - queue_id=queue_item.queue_id, - graph_execution_state_id=queue_item.session.id, - ) + # TODO(psyche): This feels jumbled - we should review separation of concerns here. + # Send complete event. The events service will receive this and update the queue item's status. + self._services.events.emit_graph_execution_complete( + queue_batch_id=queue_item.batch_id, + queue_item_id=queue_item.item_id, + queue_id=queue_item.queue_id, + graph_execution_state_id=queue_item.session.id, + ) - # We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor - # we don't care about that - suppress the error. - with suppress(GESStatsNotFoundError): - self._services.performance_statistics.log_stats(queue_item.session.id) - self._services.performance_statistics.reset_stats() + # We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor + # we don't care about that - suppress the error. + with suppress(GESStatsNotFoundError): + self._services.performance_statistics.log_stats(queue_item.session.id) + self._services.performance_statistics.reset_stats() - for callback in self._on_after_run_session_callbacks: - callback(queue_item=queue_item) + for callback in self._on_after_run_session_callbacks: + callback(queue_item=queue_item) except SessionQueueItemNotFoundError: pass def _on_before_run_node(self, invocation: BaseInvocation, queue_item: SessionQueueItem): """Run before a node is executed""" + self._services.logger.debug( + f"On before run node: queue item {queue_item.item_id}, session {queue_item.session_id}, node {invocation.id} ({invocation.get_type()})" + ) + # Send starting event self._services.events.emit_invocation_started( queue_batch_id=queue_item.batch_id, @@ -213,6 +225,10 @@ class DefaultSessionRunner(SessionRunnerBase): ): """Run after a node is executed""" + self._services.logger.debug( + f"On after run node: queue item {queue_item.item_id}, session {queue_item.session_id}, node {invocation.id} ({invocation.get_type()})" + ) + # Send complete event on successful runs self._services.events.emit_invocation_complete( queue_batch_id=queue_item.batch_id, @@ -237,6 +253,10 @@ class DefaultSessionRunner(SessionRunnerBase): ): """Run when a node errors""" + self._services.logger.debug( + f"On node error: queue item {queue_item.item_id}, session {queue_item.session_id}, node {invocation.id} ({invocation.get_type()})" + ) + # Node errors do not get the full traceback. Only the queue item gets the full traceback. node_error = f"{error_type}: {error_message}" queue_item.session.set_node_error(invocation.id, node_error)