mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
tidy(nodes): clean up profiler/stats in processor, better comments
This commit is contained in:
parent
9d27d354cf
commit
78dd460348
@ -37,6 +37,18 @@ class DefaultSessionProcessor(SessionProcessorBase):
|
|||||||
self._thread_semaphore = BoundedSemaphore(thread_limit)
|
self._thread_semaphore = BoundedSemaphore(thread_limit)
|
||||||
self._polling_interval = polling_interval
|
self._polling_interval = polling_interval
|
||||||
|
|
||||||
|
# If profiling is enabled, create a profiler. The same profiler will be used for all sessions. Internally,
|
||||||
|
# the profiler will create a new profile for each session.
|
||||||
|
self._profiler = (
|
||||||
|
Profiler(
|
||||||
|
logger=self._invoker.services.logger,
|
||||||
|
output_dir=self._invoker.services.configuration.profiles_path,
|
||||||
|
prefix=self._invoker.services.configuration.profile_prefix,
|
||||||
|
)
|
||||||
|
if self._invoker.services.configuration.profile_graphs
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
self._thread = Thread(
|
self._thread = Thread(
|
||||||
name="session_processor",
|
name="session_processor",
|
||||||
target=self._process,
|
target=self._process,
|
||||||
@ -95,32 +107,6 @@ class DefaultSessionProcessor(SessionProcessorBase):
|
|||||||
resume_event.set()
|
resume_event.set()
|
||||||
cancel_event.clear()
|
cancel_event.clear()
|
||||||
|
|
||||||
# If profiling is enabled, create a profiler. The same profiler will be used for all sessions. Internally,
|
|
||||||
# the profiler will create a new profile for each session.
|
|
||||||
profiler = (
|
|
||||||
Profiler(
|
|
||||||
logger=self._invoker.services.logger,
|
|
||||||
output_dir=self._invoker.services.configuration.profiles_path,
|
|
||||||
prefix=self._invoker.services.configuration.profile_prefix,
|
|
||||||
)
|
|
||||||
if self._invoker.services.configuration.profile_graphs
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
|
|
||||||
# Helper function to stop the profiler and save the stats
|
|
||||||
def stats_cleanup(graph_execution_state_id: str) -> None:
|
|
||||||
if profiler:
|
|
||||||
profile_path = profiler.stop()
|
|
||||||
stats_path = profile_path.with_suffix(".json")
|
|
||||||
self._invoker.services.performance_statistics.dump_stats(
|
|
||||||
graph_execution_state_id=graph_execution_state_id, output_path=stats_path
|
|
||||||
)
|
|
||||||
# We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
|
|
||||||
# we don't care about that - suppress the error.
|
|
||||||
with suppress(GESStatsNotFoundError):
|
|
||||||
self._invoker.services.performance_statistics.log_stats(graph_execution_state_id)
|
|
||||||
self._invoker.services.performance_statistics.reset_stats()
|
|
||||||
|
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
poll_now_event.clear()
|
poll_now_event.clear()
|
||||||
# Middle processor try block; any unhandled exception is a non-fatal processor error
|
# Middle processor try block; any unhandled exception is a non-fatal processor error
|
||||||
@ -132,8 +118,8 @@ class DefaultSessionProcessor(SessionProcessorBase):
|
|||||||
cancel_event.clear()
|
cancel_event.clear()
|
||||||
|
|
||||||
# If profiling is enabled, start the profiler
|
# If profiling is enabled, start the profiler
|
||||||
if profiler is not None:
|
if self._profiler is not None:
|
||||||
profiler.start(profile_id=self._queue_item.session_id)
|
self._profiler.start(profile_id=self._queue_item.session_id)
|
||||||
|
|
||||||
# Prepare invocations and take the first
|
# Prepare invocations and take the first
|
||||||
self._invocation = self._queue_item.session.next()
|
self._invocation = self._queue_item.session.next()
|
||||||
@ -228,6 +214,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
|
|||||||
)
|
)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# The session is complete if the all invocations are complete or there was an error
|
||||||
if self._queue_item.session.is_complete() or cancel_event.is_set():
|
if self._queue_item.session.is_complete() or cancel_event.is_set():
|
||||||
# Send complete event
|
# Send complete event
|
||||||
self._invoker.services.events.emit_graph_execution_complete(
|
self._invoker.services.events.emit_graph_execution_complete(
|
||||||
@ -236,8 +223,20 @@ class DefaultSessionProcessor(SessionProcessorBase):
|
|||||||
queue_id=self._queue_item.queue_id,
|
queue_id=self._queue_item.queue_id,
|
||||||
graph_execution_state_id=self._queue_item.session.id,
|
graph_execution_state_id=self._queue_item.session.id,
|
||||||
)
|
)
|
||||||
# Save the stats and stop the profiler if it's running
|
# If we are profiling, stop the profiler and dump the profile & stats
|
||||||
stats_cleanup(self._queue_item.session.id)
|
if self._profiler:
|
||||||
|
profile_path = self._profiler.stop()
|
||||||
|
stats_path = profile_path.with_suffix(".json")
|
||||||
|
self._invoker.services.performance_statistics.dump_stats(
|
||||||
|
graph_execution_state_id=self._queue_item.session.id, output_path=stats_path
|
||||||
|
)
|
||||||
|
# We'll get a GESStatsNotFoundError if we try to log stats for an untracked graph, but in the processor
|
||||||
|
# we don't care about that - suppress the error.
|
||||||
|
with suppress(GESStatsNotFoundError):
|
||||||
|
self._invoker.services.performance_statistics.log_stats(self._queue_item.session.id)
|
||||||
|
self._invoker.services.performance_statistics.reset_stats()
|
||||||
|
|
||||||
|
# Set the invocation to None to prepare for the next session
|
||||||
self._invocation = None
|
self._invocation = None
|
||||||
else:
|
else:
|
||||||
# Prepare the next invocation
|
# Prepare the next invocation
|
||||||
@ -252,14 +251,18 @@ class DefaultSessionProcessor(SessionProcessorBase):
|
|||||||
poll_now_event.wait(self._polling_interval)
|
poll_now_event.wait(self._polling_interval)
|
||||||
continue
|
continue
|
||||||
except Exception:
|
except Exception:
|
||||||
# Non-fatal error in processor, cancel the queue item and wait for next polling interval or event
|
# Non-fatal error in processor
|
||||||
self._invoker.services.logger.error(
|
self._invoker.services.logger.error(
|
||||||
f"Non-fatal error in session processor:\n{traceback.format_exc()}"
|
f"Non-fatal error in session processor:\n{traceback.format_exc()}"
|
||||||
)
|
)
|
||||||
|
# Cancel the queue item
|
||||||
if self._queue_item is not None:
|
if self._queue_item is not None:
|
||||||
self._invoker.services.session_queue.cancel_queue_item(
|
self._invoker.services.session_queue.cancel_queue_item(
|
||||||
self._queue_item.item_id, error=traceback.format_exc()
|
self._queue_item.item_id, error=traceback.format_exc()
|
||||||
)
|
)
|
||||||
|
# Reset the invocation to None to prepare for the next session
|
||||||
|
self._invocation = None
|
||||||
|
# Immediately poll for next queue item
|
||||||
poll_now_event.wait(self._polling_interval)
|
poll_now_event.wait(self._polling_interval)
|
||||||
continue
|
continue
|
||||||
except Exception:
|
except Exception:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user