feat(queue): take one functionality in session processor

Executes the next queue item, then pauses. Does nothing if the queue is already running.
This commit is contained in:
psychedelicious 2023-09-26 19:29:53 +10:00
parent 3e01c396e1
commit 5e6b5c8fd6
3 changed files with 32 additions and 0 deletions

View File

@ -93,6 +93,18 @@ async def Pause(
return ApiDependencies.invoker.services.session_processor.pause()
@session_queue_router.put(
"/{queue_id}/processor/take_one",
operation_id="take_one",
responses={200: {"model": SessionProcessorStatus}},
)
async def take_one(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionProcessorStatus:
"""Executes the next-in-line queue item, pausing the processor afterwards. Has no effect if the queue is resumed."""
return ApiDependencies.invoker.services.session_processor.take_one()
@session_queue_router.put(
"/{queue_id}/cancel_by_batch_ids",
operation_id="cancel_by_batch_ids",

View File

@ -22,6 +22,11 @@ class SessionProcessorBase(ABC):
"""Pauses the session processor"""
pass
@abstractmethod
def take_one(self) -> SessionProcessorStatus:
"""Takes one session from the queue and executes it"""
pass
@abstractmethod
def get_status(self) -> SessionProcessorStatus:
"""Gets the status of the session processor"""

View File

@ -25,6 +25,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
self.__resume_event = ThreadEvent()
self.__stop_event = ThreadEvent()
self.__poll_now_event = ThreadEvent()
self.__take_one_event = ThreadEvent()
local_handler.register(event_name=EventServiceBase.queue_event, _func=self._on_queue_event)
@ -36,6 +37,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
"stop_event": self.__stop_event,
"poll_now_event": self.__poll_now_event,
"resume_event": self.__resume_event,
"take_one_event": self.__take_one_event,
},
)
self.__thread.start()
@ -81,6 +83,13 @@ class DefaultSessionProcessor(SessionProcessorBase):
self.__resume_event.clear()
return self.get_status()
def take_one(self) -> SessionProcessorStatus:
if self.__queue_item is None and not self.__resume_event.is_set():
self.__resume_event.set()
self.__take_one_event.set()
self._poll_now()
return self.get_status()
def get_status(self) -> SessionProcessorStatus:
return SessionProcessorStatus(
is_started=self.__resume_event.is_set(),
@ -92,9 +101,11 @@ class DefaultSessionProcessor(SessionProcessorBase):
stop_event: ThreadEvent,
poll_now_event: ThreadEvent,
resume_event: ThreadEvent,
take_one_event: ThreadEvent,
):
try:
stop_event.clear()
take_one_event.clear()
resume_event.set()
self.__threadLimit.acquire()
queue_item: Optional[SessionQueueItem] = None
@ -118,6 +129,10 @@ class DefaultSessionProcessor(SessionProcessorBase):
)
queue_item = None
if take_one_event.is_set():
resume_event.clear()
take_one_event.clear()
if queue_item is None:
self.__invoker.services.logger.debug("Waiting for next polling interval or event")
poll_now_event.wait(POLLING_INTERVAL)