fix(queue): fix duplicate queue item status events

This commit is contained in:
psychedelicious 2023-09-21 08:48:03 +10:00 committed by Kent Keirsey
parent 098d506b95
commit 183e2c3ee0

View File

@ -77,7 +77,6 @@ class SqliteSessionQueue(SessionQueueBase):
queue_item = self.get_queue_item(item_id)
if queue_item.status not in ["completed", "failed", "canceled"]:
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="completed")
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
except SessionQueueItemNotFoundError:
return
@ -86,8 +85,8 @@ class SqliteSessionQueue(SessionQueueBase):
item_id = event[1]["data"]["queue_item_id"]
error = event[1]["data"]["error"]
queue_item = self.get_queue_item(item_id)
# always set to failed if have an error, even if previously the item was marked completed or canceled
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="failed", error=error)
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
except SessionQueueItemNotFoundError:
return
@ -95,8 +94,8 @@ class SqliteSessionQueue(SessionQueueBase):
try:
item_id = event[1]["data"]["queue_item_id"]
queue_item = self.get_queue_item(item_id)
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="canceled")
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
if queue_item.status not in ["completed", "failed", "canceled"]:
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="canceled")
except SessionQueueItemNotFoundError:
return
@ -354,7 +353,6 @@ class SqliteSessionQueue(SessionQueueBase):
return None
queue_item = SessionQueueItem.from_dict(dict(result))
queue_item = self._set_queue_item_status(item_id=queue_item.item_id, status="in_progress")
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
return queue_item
def get_next(self, queue_id: str) -> Optional[SessionQueueItem]:
@ -427,7 +425,9 @@ class SqliteSessionQueue(SessionQueueBase):
raise
finally:
self.__lock.release()
return self.get_queue_item(item_id)
queue_item = self.get_queue_item(item_id)
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
return queue_item
def is_empty(self, queue_id: str) -> IsEmptyResult:
try:
@ -565,7 +565,6 @@ class SqliteSessionQueue(SessionQueueBase):
queue_batch_id=queue_item.batch_id,
graph_execution_state_id=queue_item.session_id,
)
self.__invoker.services.events.emit_queue_item_status_changed(queue_item)
return queue_item
def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult: