feat(nodes): make processor thread limit and polling interval configurable

This commit is contained in:
psychedelicious 2024-02-18 12:33:16 +11:00
parent 51133522b7
commit 4956fa282b

View File

@ -18,12 +18,9 @@ from ..invoker import Invoker
from .session_processor_base import SessionProcessorBase from .session_processor_base import SessionProcessorBase
from .session_processor_common import SessionProcessorStatus from .session_processor_common import SessionProcessorStatus
POLLING_INTERVAL = 1
THREAD_LIMIT = 1
class DefaultSessionProcessor(SessionProcessorBase): class DefaultSessionProcessor(SessionProcessorBase):
def start(self, invoker: Invoker) -> None: def start(self, invoker: Invoker, thread_limit: int = 1, polling_interval: int = 1) -> None:
self._invoker: Invoker = invoker self._invoker: Invoker = invoker
self._queue_item: Optional[SessionQueueItem] = None self._queue_item: Optional[SessionQueueItem] = None
@ -34,7 +31,10 @@ class DefaultSessionProcessor(SessionProcessorBase):
local_handler.register(event_name=EventServiceBase.queue_event, _func=self._on_queue_event) local_handler.register(event_name=EventServiceBase.queue_event, _func=self._on_queue_event)
self._thread_limit = BoundedSemaphore(THREAD_LIMIT) self._thread_limit = thread_limit
self._thread_semaphore = BoundedSemaphore(thread_limit)
self._polling_interval = polling_interval
self._thread = Thread( self._thread = Thread(
name="session_processor", name="session_processor",
target=self._process, target=self._process,
@ -88,7 +88,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
): ):
# Outermost processor try block; any unhandled exception is a fatal processor error # Outermost processor try block; any unhandled exception is a fatal processor error
try: try:
self._thread_limit.acquire() self._thread_semaphore.acquire()
stop_event.clear() stop_event.clear()
resume_event.set() resume_event.set()
cancel_event.clear() cancel_event.clear()
@ -247,7 +247,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
else: else:
# The queue was empty, wait for next polling interval or event to try again # The queue was empty, wait for next polling interval or event to try again
self._invoker.services.logger.debug("Waiting for next polling interval or event") self._invoker.services.logger.debug("Waiting for next polling interval or event")
poll_now_event.wait(POLLING_INTERVAL) poll_now_event.wait(self._polling_interval)
continue continue
except Exception as e: except Exception as e:
# Non-fatal error in processor, cancel the queue item and wait for next polling interval or event # Non-fatal error in processor, cancel the queue item and wait for next polling interval or event
@ -256,7 +256,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
self._invoker.services.session_queue.cancel_queue_item( self._invoker.services.session_queue.cancel_queue_item(
self._queue_item.item_id, error=traceback.format_exc() self._queue_item.item_id, error=traceback.format_exc()
) )
poll_now_event.wait(POLLING_INTERVAL) poll_now_event.wait(self._polling_interval)
continue continue
except Exception as e: except Exception as e:
# Fatal error in processor, log and pass - we're done here # Fatal error in processor, log and pass - we're done here
@ -266,4 +266,4 @@ class DefaultSessionProcessor(SessionProcessorBase):
stop_event.clear() stop_event.clear()
poll_now_event.clear() poll_now_event.clear()
self._queue_item = None self._queue_item = None
self._thread_limit.release() self._thread_semaphore.release()