From 16676feea84663b6075a42ef8bd3641ca4d7fc7e Mon Sep 17 00:00:00 2001 From: psychedelicious <4822129+psychedelicious@users.noreply.github.com> Date: Sun, 18 Feb 2024 12:33:16 +1100 Subject: [PATCH] feat(nodes): make processor thread limit and polling interval configurable --- .../session_processor_default.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index dc08fc8345..3035a74a5a 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -18,12 +18,9 @@ from ..invoker import Invoker from .session_processor_base import SessionProcessorBase from .session_processor_common import SessionProcessorStatus -POLLING_INTERVAL = 1 -THREAD_LIMIT = 1 - class DefaultSessionProcessor(SessionProcessorBase): - def start(self, invoker: Invoker) -> None: + def start(self, invoker: Invoker, thread_limit: int = 1, polling_interval: int = 1) -> None: self._invoker: Invoker = invoker self._queue_item: Optional[SessionQueueItem] = None @@ -34,7 +31,10 @@ class DefaultSessionProcessor(SessionProcessorBase): local_handler.register(event_name=EventServiceBase.queue_event, _func=self._on_queue_event) - self._thread_limit = BoundedSemaphore(THREAD_LIMIT) + self._thread_limit = thread_limit + self._thread_semaphore = BoundedSemaphore(thread_limit) + self._polling_interval = polling_interval + self._thread = Thread( name="session_processor", target=self._process, @@ -88,7 +88,7 @@ class DefaultSessionProcessor(SessionProcessorBase): ): # Outermost processor try block; any unhandled exception is a fatal processor error try: - self._thread_limit.acquire() + self._thread_semaphore.acquire() stop_event.clear() resume_event.set() cancel_event.clear() @@ -247,7 +247,7 @@ class DefaultSessionProcessor(SessionProcessorBase): else: # The queue was empty, wait for next polling interval or event to try again self._invoker.services.logger.debug("Waiting for next polling interval or event") - poll_now_event.wait(POLLING_INTERVAL) + poll_now_event.wait(self._polling_interval) continue except Exception as e: # Non-fatal error in processor, cancel the queue item and wait for next polling interval or event @@ -256,7 +256,7 @@ class DefaultSessionProcessor(SessionProcessorBase): self._invoker.services.session_queue.cancel_queue_item( self._queue_item.item_id, error=traceback.format_exc() ) - poll_now_event.wait(POLLING_INTERVAL) + poll_now_event.wait(self._polling_interval) continue except Exception as e: # Fatal error in processor, log and pass - we're done here @@ -266,4 +266,4 @@ class DefaultSessionProcessor(SessionProcessorBase): stop_event.clear() poll_now_event.clear() self._queue_item = None - self._thread_limit.release() + self._thread_semaphore.release()