Merge branch 'psyche/fix/nodes/processor-cpu-usage' into lstein/feat/multi-gpu

This commit is contained in:
Lincoln Stein 2024-03-31 17:05:23 -04:00
commit cef51ad80d

View File

@ -122,11 +122,17 @@ class DefaultSessionProcessor(SessionProcessorBase):
# Middle processor try block; any unhandled exception is a non-fatal processor error # Middle processor try block; any unhandled exception is a non-fatal processor error
try: try:
# If we are paused, wait for resume event # If we are paused, wait for resume event
if resume_event.is_set(): resume_event.wait()
# Get the next session to process # Get the next session to process
self._queue_item = self._invoker.services.session_queue.dequeue() self._queue_item = self._invoker.services.session_queue.dequeue()
if self._queue_item is not None: if self._queue_item is None:
# The queue was empty, wait for next polling interval or event to try again
self._invoker.services.logger.debug("Waiting for next polling interval or event")
poll_now_event.wait(self._polling_interval)
continue
self._invoker.services.logger.debug(f"Executing queue item {self._queue_item.item_id}") self._invoker.services.logger.debug(f"Executing queue item {self._queue_item.item_id}")
cancel_event.clear() cancel_event.clear()
@ -140,9 +146,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
# Loop over invocations until the session is complete or canceled # Loop over invocations until the session is complete or canceled
while self._invocation is not None and not cancel_event.is_set(): while self._invocation is not None and not cancel_event.is_set():
# get the source node id to provide to clients (the prepared node id is not as useful) # get the source node id to provide to clients (the prepared node id is not as useful)
source_invocation_id = self._queue_item.session.prepared_source_mapping[ source_invocation_id = self._queue_item.session.prepared_source_mapping[self._invocation.id]
self._invocation.id
]
# Send starting event # Send starting event
self._invoker.services.events.emit_invocation_started( self._invoker.services.events.emit_invocation_started(
@ -249,9 +253,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
# We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor # We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
# we don't care about that - suppress the error. # we don't care about that - suppress the error.
with suppress(GESStatsNotFoundError): with suppress(GESStatsNotFoundError):
self._invoker.services.performance_statistics.log_stats( self._invoker.services.performance_statistics.log_stats(self._queue_item.session.id)
self._queue_item.session.id
)
self._invoker.services.performance_statistics.reset_stats() self._invoker.services.performance_statistics.reset_stats()
# Set the invocation to None to prepare for the next session # Set the invocation to None to prepare for the next session
@ -259,10 +261,6 @@ class DefaultSessionProcessor(SessionProcessorBase):
else: else:
# Prepare the next invocation # Prepare the next invocation
self._invocation = self._queue_item.session.next() self._invocation = self._queue_item.session.next()
# The session is complete, immediately poll for next session
self._queue_item = None
poll_now_event.set()
else: else:
# The queue was empty, wait for next polling interval or event to try again # The queue was empty, wait for next polling interval or event to try again
self._invoker.services.logger.debug("Waiting for next polling interval or event") self._invoker.services.logger.debug("Waiting for next polling interval or event")