feat(nodes): make processor thread limit and polling interval configurable

This commit is contained in:
psychedelicious 2024-02-18 12:33:16 +11:00
parent 0788a27a80
commit 16676feea8

View File

@ -18,12 +18,9 @@ from ..invoker import Invoker
from .session_processor_base import SessionProcessorBase
from .session_processor_common import SessionProcessorStatus
POLLING_INTERVAL = 1
THREAD_LIMIT = 1
class DefaultSessionProcessor(SessionProcessorBase):
def start(self, invoker: Invoker) -> None:
def start(self, invoker: Invoker, thread_limit: int = 1, polling_interval: int = 1) -> None:
self._invoker: Invoker = invoker
self._queue_item: Optional[SessionQueueItem] = None
@ -34,7 +31,10 @@ class DefaultSessionProcessor(SessionProcessorBase):
local_handler.register(event_name=EventServiceBase.queue_event, _func=self._on_queue_event)
self._thread_limit = BoundedSemaphore(THREAD_LIMIT)
self._thread_limit = thread_limit
self._thread_semaphore = BoundedSemaphore(thread_limit)
self._polling_interval = polling_interval
self._thread = Thread(
name="session_processor",
target=self._process,
@ -88,7 +88,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
):
# Outermost processor try block; any unhandled exception is a fatal processor error
try:
self._thread_limit.acquire()
self._thread_semaphore.acquire()
stop_event.clear()
resume_event.set()
cancel_event.clear()
@ -247,7 +247,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
else:
# The queue was empty, wait for next polling interval or event to try again
self._invoker.services.logger.debug("Waiting for next polling interval or event")
poll_now_event.wait(POLLING_INTERVAL)
poll_now_event.wait(self._polling_interval)
continue
except Exception as e:
# Non-fatal error in processor, cancel the queue item and wait for next polling interval or event
@ -256,7 +256,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
self._invoker.services.session_queue.cancel_queue_item(
self._queue_item.item_id, error=traceback.format_exc()
)
poll_now_event.wait(POLLING_INTERVAL)
poll_now_event.wait(self._polling_interval)
continue
except Exception as e:
# Fatal error in processor, log and pass - we're done here
@ -266,4 +266,4 @@ class DefaultSessionProcessor(SessionProcessorBase):
stop_event.clear()
poll_now_event.clear()
self._queue_item = None
self._thread_limit.release()
self._thread_semaphore.release()