feat(nodes): log stats for canceled graphs

When an invocation is canceled, we consider the graph canceled. Log its graph's stats before resetting its graph's stats. No reason to not log these stats.

We also should stop the profiler at this point, because this graph is finished. If we don't stop it manually, it will stop itself and write the profile to disk when it is next started, but the resultant profile will include more than just its target graph.

Now we get both stats and profiles for canceled graphs.
This commit is contained in:
psychedelicious 2024-02-06 17:38:25 +11:00
parent e0e106367d
commit 810fc19e43

View File

@ -157,6 +157,13 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
except CanceledException:
with suppress(GESStatsNotFoundError):
if profiler:
profile_path = profiler.stop()
stats_path = profile_path.with_suffix(".json")
self.__invoker.services.performance_statistics.dump_stats(
graph_execution_state_id=graph_execution_state.id, output_path=stats_path
)
self.__invoker.services.performance_statistics.log_stats(graph_execution_state.id)
self.__invoker.services.performance_statistics.reset_stats(graph_execution_state.id)
pass
@ -213,20 +220,20 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
error=traceback.format_exc(),
)
elif is_complete:
self.__invoker.services.events.emit_graph_execution_complete(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,
)
if profiler:
profile_path = profiler.stop()
stats_path = profile_path.with_suffix(".json")
self.__invoker.services.performance_statistics.dump_stats(
graph_execution_state_id=graph_execution_state.id, output_path=stats_path
)
with suppress(GESStatsNotFoundError):
self.__invoker.services.performance_statistics.log_stats(graph_execution_state.id)
self.__invoker.services.events.emit_graph_execution_complete(
queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id,
queue_id=queue_item.session_queue_id,
graph_execution_state_id=graph_execution_state.id,
)
if profiler:
profile_path = profiler.stop()
stats_path = profile_path.with_suffix(".json")
self.__invoker.services.performance_statistics.dump_stats(
graph_execution_state_id=graph_execution_state.id, output_path=stats_path
)
self.__invoker.services.performance_statistics.reset_stats(graph_execution_state.id)
except KeyboardInterrupt: