fix(nodes): 100% cpu usage when processor paused

Should be waiting on the resume event instead of checking it in a loop
This commit is contained in:
psychedelicious 2024-04-01 07:45:36 +11:00
parent 1badf0f32f
commit bd9b00a6bf

View File

@ -122,152 +122,149 @@ class DefaultSessionProcessor(SessionProcessorBase):
# Middle processor try block; any unhandled exception is a non-fatal processor error # Middle processor try block; any unhandled exception is a non-fatal processor error
try: try:
# If we are paused, wait for resume event # If we are paused, wait for resume event
if resume_event.is_set(): resume_event.wait()
# Get the next session to process
self._queue_item = self._invoker.services.session_queue.dequeue()
if self._queue_item is not None: # Get the next session to process
self._invoker.services.logger.debug(f"Executing queue item {self._queue_item.item_id}") self._queue_item = self._invoker.services.session_queue.dequeue()
cancel_event.clear()
# If profiling is enabled, start the profiler if self._queue_item is not None:
if self._profiler is not None: self._invoker.services.logger.debug(f"Executing queue item {self._queue_item.item_id}")
self._profiler.start(profile_id=self._queue_item.session_id) cancel_event.clear()
# Prepare invocations and take the first # If profiling is enabled, start the profiler
self._invocation = self._queue_item.session.next() if self._profiler is not None:
self._profiler.start(profile_id=self._queue_item.session_id)
# Loop over invocations until the session is complete or canceled # Prepare invocations and take the first
while self._invocation is not None and not cancel_event.is_set(): self._invocation = self._queue_item.session.next()
# get the source node id to provide to clients (the prepared node id is not as useful)
source_invocation_id = self._queue_item.session.prepared_source_mapping[
self._invocation.id
]
# Send starting event # Loop over invocations until the session is complete or canceled
self._invoker.services.events.emit_invocation_started( while self._invocation is not None and not cancel_event.is_set():
queue_batch_id=self._queue_item.batch_id, # get the source node id to provide to clients (the prepared node id is not as useful)
queue_item_id=self._queue_item.item_id, source_invocation_id = self._queue_item.session.prepared_source_mapping[self._invocation.id]
queue_id=self._queue_item.queue_id,
graph_execution_state_id=self._queue_item.session_id,
node=self._invocation.model_dump(),
source_node_id=source_invocation_id,
)
# Innermost processor try block; any unhandled exception is an invocation error & will fail the graph # Send starting event
try: self._invoker.services.events.emit_invocation_started(
with self._invoker.services.performance_statistics.collect_stats( queue_batch_id=self._queue_item.batch_id,
self._invocation, self._queue_item.session.id queue_item_id=self._queue_item.item_id,
): queue_id=self._queue_item.queue_id,
# Build invocation context (the node-facing API) graph_execution_state_id=self._queue_item.session_id,
data = InvocationContextData( node=self._invocation.model_dump(),
invocation=self._invocation, source_node_id=source_invocation_id,
source_invocation_id=source_invocation_id, )
queue_item=self._queue_item,
)
context = build_invocation_context(
data=data,
services=self._invoker.services,
cancel_event=self._cancel_event,
)
# Invoke the node # Innermost processor try block; any unhandled exception is an invocation error & will fail the graph
outputs = self._invocation.invoke_internal( try:
context=context, services=self._invoker.services with self._invoker.services.performance_statistics.collect_stats(
) self._invocation, self._queue_item.session.id
):
# Save outputs and history # Build invocation context (the node-facing API)
self._queue_item.session.complete(self._invocation.id, outputs) data = InvocationContextData(
invocation=self._invocation,
# Send complete event source_invocation_id=source_invocation_id,
self._invoker.services.events.emit_invocation_complete( queue_item=self._queue_item,
queue_batch_id=self._queue_item.batch_id, )
queue_item_id=self._queue_item.item_id, context = build_invocation_context(
queue_id=self._queue_item.queue_id, data=data,
graph_execution_state_id=self._queue_item.session.id, services=self._invoker.services,
node=self._invocation.model_dump(), cancel_event=self._cancel_event,
source_node_id=source_invocation_id,
result=outputs.model_dump(),
)
except KeyboardInterrupt:
# TODO(MM2): Create an event for this
pass
except CanceledException:
# When the user cancels the graph, we first set the cancel event. The event is checked
# between invocations, in this loop. Some invocations are long-running, and we need to
# be able to cancel them mid-execution.
#
# For example, denoising is a long-running invocation with many steps. A step callback
# is executed after each step. This step callback checks if the canceled event is set,
# then raises a CanceledException to stop execution immediately.
#
# When we get a CanceledException, we don't need to do anything - just pass and let the
# loop go to its next iteration, and the cancel event will be handled correctly.
pass
except Exception as e:
error = traceback.format_exc()
# Save error
self._queue_item.session.set_node_error(self._invocation.id, error)
self._invoker.services.logger.error(
f"Error while invoking session {self._queue_item.session_id}, invocation {self._invocation.id} ({self._invocation.get_type()}):\n{e}"
) )
self._invoker.services.logger.error(error)
# Send error event # Invoke the node
self._invoker.services.events.emit_invocation_error( outputs = self._invocation.invoke_internal(
queue_batch_id=self._queue_item.session_id, context=context, services=self._invoker.services
)
# Save outputs and history
self._queue_item.session.complete(self._invocation.id, outputs)
# Send complete event
self._invoker.services.events.emit_invocation_complete(
queue_batch_id=self._queue_item.batch_id,
queue_item_id=self._queue_item.item_id, queue_item_id=self._queue_item.item_id,
queue_id=self._queue_item.queue_id, queue_id=self._queue_item.queue_id,
graph_execution_state_id=self._queue_item.session.id, graph_execution_state_id=self._queue_item.session.id,
node=self._invocation.model_dump(), node=self._invocation.model_dump(),
source_node_id=source_invocation_id, source_node_id=source_invocation_id,
error_type=e.__class__.__name__, result=outputs.model_dump(),
error=error,
) )
pass
# The session is complete if the all invocations are complete or there was an error except KeyboardInterrupt:
if self._queue_item.session.is_complete() or cancel_event.is_set(): # TODO(MM2): Create an event for this
# Send complete event pass
self._invoker.services.events.emit_graph_execution_complete(
queue_batch_id=self._queue_item.batch_id, except CanceledException:
queue_item_id=self._queue_item.item_id, # When the user cancels the graph, we first set the cancel event. The event is checked
queue_id=self._queue_item.queue_id, # between invocations, in this loop. Some invocations are long-running, and we need to
graph_execution_state_id=self._queue_item.session.id, # be able to cancel them mid-execution.
#
# For example, denoising is a long-running invocation with many steps. A step callback
# is executed after each step. This step callback checks if the canceled event is set,
# then raises a CanceledException to stop execution immediately.
#
# When we get a CanceledException, we don't need to do anything - just pass and let the
# loop go to its next iteration, and the cancel event will be handled correctly.
pass
except Exception as e:
error = traceback.format_exc()
# Save error
self._queue_item.session.set_node_error(self._invocation.id, error)
self._invoker.services.logger.error(
f"Error while invoking session {self._queue_item.session_id}, invocation {self._invocation.id} ({self._invocation.get_type()}):\n{e}"
)
self._invoker.services.logger.error(error)
# Send error event
self._invoker.services.events.emit_invocation_error(
queue_batch_id=self._queue_item.session_id,
queue_item_id=self._queue_item.item_id,
queue_id=self._queue_item.queue_id,
graph_execution_state_id=self._queue_item.session.id,
node=self._invocation.model_dump(),
source_node_id=source_invocation_id,
error_type=e.__class__.__name__,
error=error,
)
pass
# The session is complete if the all invocations are complete or there was an error
if self._queue_item.session.is_complete() or cancel_event.is_set():
# Send complete event
self._invoker.services.events.emit_graph_execution_complete(
queue_batch_id=self._queue_item.batch_id,
queue_item_id=self._queue_item.item_id,
queue_id=self._queue_item.queue_id,
graph_execution_state_id=self._queue_item.session.id,
)
# If we are profiling, stop the profiler and dump the profile & stats
if self._profiler:
profile_path = self._profiler.stop()
stats_path = profile_path.with_suffix(".json")
self._invoker.services.performance_statistics.dump_stats(
graph_execution_state_id=self._queue_item.session.id, output_path=stats_path
) )
# If we are profiling, stop the profiler and dump the profile & stats # We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
if self._profiler: # we don't care about that - suppress the error.
profile_path = self._profiler.stop() with suppress(GESStatsNotFoundError):
stats_path = profile_path.with_suffix(".json") self._invoker.services.performance_statistics.log_stats(self._queue_item.session.id)
self._invoker.services.performance_statistics.dump_stats( self._invoker.services.performance_statistics.reset_stats()
graph_execution_state_id=self._queue_item.session.id, output_path=stats_path
)
# We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
# we don't care about that - suppress the error.
with suppress(GESStatsNotFoundError):
self._invoker.services.performance_statistics.log_stats(
self._queue_item.session.id
)
self._invoker.services.performance_statistics.reset_stats()
# Set the invocation to None to prepare for the next session # Set the invocation to None to prepare for the next session
self._invocation = None self._invocation = None
else: else:
# Prepare the next invocation # Prepare the next invocation
self._invocation = self._queue_item.session.next() self._invocation = self._queue_item.session.next()
# The session is complete, immediately poll for next session # The session is complete, immediately poll for next session
self._queue_item = None self._queue_item = None
poll_now_event.set() poll_now_event.set()
else: else:
# The queue was empty, wait for next polling interval or event to try again # The queue was empty, wait for next polling interval or event to try again
self._invoker.services.logger.debug("Waiting for next polling interval or event") self._invoker.services.logger.debug("Waiting for next polling interval or event")
poll_now_event.wait(self._polling_interval) poll_now_event.wait(self._polling_interval)
continue continue
except Exception: except Exception:
# Non-fatal error in processor # Non-fatal error in processor
self._invoker.services.logger.error( self._invoker.services.logger.error(