Reduce the number of graph_execution_manager.get(...) calls from the InvocationStatsService.

This commit is contained in:
Ryan Dick 2024-01-11 13:03:03 -05:00 committed by Kent Keirsey
parent ac42513da9
commit aa45d21fd2
3 changed files with 51 additions and 37 deletions

View File

@ -132,7 +132,6 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
source_node_id=source_node_id, source_node_id=source_node_id,
result=outputs.model_dump(), result=outputs.model_dump(),
) )
self.__invoker.services.performance_statistics.log_stats()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -195,6 +194,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
error=traceback.format_exc(), error=traceback.format_exc(),
) )
elif is_complete: elif is_complete:
self.__invoker.services.performance_statistics.log_stats(graph_execution_state.id)
self.__invoker.services.events.emit_graph_execution_complete( self.__invoker.services.events.emit_graph_execution_complete(
queue_batch_id=queue_item.session_queue_batch_id, queue_batch_id=queue_item.session_queue_batch_id,
queue_item_id=queue_item.session_queue_item_id, queue_item_id=queue_item.session_queue_item_id,

View File

@ -67,7 +67,7 @@ class InvocationStatsServiceBase(ABC):
pass pass
@abstractmethod @abstractmethod
def log_stats(self): def log_stats(self, graph_execution_state_id: str):
""" """
Write out the accumulated statistics to the log or somewhere else. Write out the accumulated statistics to the log or somewhere else.
""" """

View File

@ -36,6 +36,9 @@ class InvocationStatsService(InvocationStatsServiceBase):
self._stats[graph_execution_state_id] = GraphExecutionStats() self._stats[graph_execution_state_id] = GraphExecutionStats()
self._cache_stats[graph_execution_state_id] = CacheStats() self._cache_stats[graph_execution_state_id] = CacheStats()
# Prune stale stats. There should be none since we're starting a new graph, but just in case.
self._prune_stale_stats()
# Record state before the invocation. # Record state before the invocation.
start_time = time.time() start_time = time.time()
start_ram = psutil.Process().memory_info().rss start_ram = psutil.Process().memory_info().rss
@ -59,29 +62,47 @@ class InvocationStatsService(InvocationStatsServiceBase):
) )
self._stats[graph_execution_state_id].add_node_execution_stats(node_stats) self._stats[graph_execution_state_id].add_node_execution_stats(node_stats)
def reset_stats(self, graph_execution_id: str): def _prune_stale_stats(self):
try: """Check all graphs being tracked and prune any that have completed/errored.
self._stats.pop(graph_execution_id)
except KeyError:
logger.warning(f"Attempted to clear statistics for unknown graph {graph_execution_id}")
def log_stats(self): This shouldn't be necessary, but we don't have totally robust upstream handling of graph completions/errors, so
completed = set() for now we call this function periodically to prevent them from accumulating.
errored = set() """
for graph_id, _node_log in self._stats.items(): to_prune = []
for graph_execution_state_id in self._stats:
try: try:
current_graph_state = self._invoker.services.graph_execution_manager.get(graph_id) graph_execution_state = self._invoker.services.graph_execution_manager.get(graph_execution_state_id)
except Exception: except Exception:
errored.add(graph_id) # TODO(ryand): What would cause this? Should this exception just be allowed to propagate?
logger.warning(f"Failed to get graph state for {graph_execution_state_id}.")
continue continue
if not current_graph_state.is_complete(): if not graph_execution_state.is_complete():
# The graph is still running, don't prune it.
continue continue
graph_stats = self._stats[graph_id] to_prune.append(graph_execution_state_id)
log = graph_stats.get_pretty_log(graph_id)
for graph_execution_state_id in to_prune:
del self._stats[graph_execution_state_id]
del self._cache_stats[graph_execution_state_id]
if len(to_prune) > 0:
logger.info(f"Pruned stale graph stats for {to_prune}.")
def reset_stats(self, graph_execution_state_id: str):
try:
del self._stats[graph_execution_state_id]
del self._cache_stats[graph_execution_state_id]
except KeyError as e:
logger.warning(f"Attempted to clear statistics for unknown graph {graph_execution_state_id}: {e}.")
def log_stats(self, graph_execution_state_id: str):
graph_stats = self._stats[graph_execution_state_id]
cache_stats = self._cache_stats[graph_execution_state_id]
log = graph_stats.get_pretty_log(graph_execution_state_id)
cache_stats = self._cache_stats[graph_id]
hwm = cache_stats.high_watermark / GB hwm = cache_stats.high_watermark / GB
tot = cache_stats.cache_size / GB tot = cache_stats.cache_size / GB
loaded = sum(list(cache_stats.loaded_model_sizes.values())) / GB loaded = sum(list(cache_stats.loaded_model_sizes.values())) / GB
@ -96,12 +117,5 @@ class InvocationStatsService(InvocationStatsServiceBase):
log += f" Cache high water mark: {hwm:4.2f}/{tot:4.2f}G\n" log += f" Cache high water mark: {hwm:4.2f}/{tot:4.2f}G\n"
logger.info(log) logger.info(log)
completed.add(graph_id) del self._stats[graph_execution_state_id]
del self._cache_stats[graph_execution_state_id]
for graph_id in completed:
del self._stats[graph_id]
del self._cache_stats[graph_id]
for graph_id in errored:
del self._stats[graph_id]
del self._cache_stats[graph_id]