2022-12-01 05:33:20 +00:00
|
|
|
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
|
|
|
|
|
|
|
|
from abc import ABC
|
2023-07-03 16:17:45 +00:00
|
|
|
from typing import Optional
|
2023-03-03 06:02:00 +00:00
|
|
|
|
|
|
|
from .graph import Graph, GraphExecutionState
|
2023-07-03 16:17:45 +00:00
|
|
|
from .invocation_queue import InvocationQueueItem
|
2023-03-03 06:02:00 +00:00
|
|
|
from .invocation_services import InvocationServices
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
class Invoker:
|
|
|
|
"""The invoker, used to execute invocations"""
|
|
|
|
|
|
|
|
services: InvocationServices
|
|
|
|
|
2023-03-03 06:02:00 +00:00
|
|
|
def __init__(self, services: InvocationServices):
|
2022-12-01 05:33:20 +00:00
|
|
|
self.services = services
|
|
|
|
self._start()
|
|
|
|
|
2023-03-03 06:02:00 +00:00
|
|
|
def invoke(
|
|
|
|
self, graph_execution_state: GraphExecutionState, invoke_all: bool = False
|
2023-07-03 16:17:45 +00:00
|
|
|
) -> Optional[str]:
|
2023-06-08 10:22:46 +00:00
|
|
|
"""Determines the next node to invoke and enqueues it, preparing if needed.
|
|
|
|
Returns the id of the queued node, or `None` if there are no nodes left to enqueue."""
|
2022-12-01 05:33:20 +00:00
|
|
|
# Get the next invocation
|
|
|
|
invocation = graph_execution_state.next()
|
|
|
|
if not invocation:
|
|
|
|
return None
|
2023-07-24 21:41:54 +00:00
|
|
|
(index, batch) = next(((i,b) for i,b in enumerate(graph_execution_state.graph.batches) if b.node_id in invocation.id), (None, None))
|
|
|
|
if batch:
|
|
|
|
# assert(isinstance(invocation.type, batch.node_type), f"Type mismatch between nodes and batch config on {invocation.id}")
|
2023-07-24 21:43:49 +00:00
|
|
|
batch_index = graph_execution_state.batch_indices[index]
|
2023-07-24 21:41:54 +00:00
|
|
|
datum = batch.data[batch_index]
|
|
|
|
for param in datum.keys():
|
|
|
|
invocation[param] = datum[param]
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
# Save the execution state
|
2023-02-25 04:11:28 +00:00
|
|
|
self.services.graph_execution_manager.set(graph_execution_state)
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
# Queue the invocation
|
2023-03-03 06:02:00 +00:00
|
|
|
self.services.queue.put(
|
|
|
|
InvocationQueueItem(
|
|
|
|
# session_id = session.id,
|
|
|
|
graph_execution_state_id=graph_execution_state.id,
|
|
|
|
invocation_id=invocation.id,
|
|
|
|
invoke_all=invoke_all,
|
|
|
|
)
|
|
|
|
)
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
return invocation.id
|
|
|
|
|
2023-07-03 16:17:45 +00:00
|
|
|
def create_execution_state(self, graph: Optional[Graph] = None) -> GraphExecutionState:
|
2022-12-01 05:33:20 +00:00
|
|
|
"""Creates a new execution state for the given graph"""
|
2023-03-03 06:02:00 +00:00
|
|
|
new_state = GraphExecutionState(graph=Graph() if graph is None else graph)
|
2023-07-24 21:41:54 +00:00
|
|
|
if graph.batches:
|
|
|
|
batch_index = list()
|
|
|
|
for batch in graph.batches:
|
|
|
|
batch_index.append(len(batch.data)-1)
|
|
|
|
new_state.batch_index = batch_index
|
2023-02-25 04:11:28 +00:00
|
|
|
self.services.graph_execution_manager.set(new_state)
|
2022-12-01 05:33:20 +00:00
|
|
|
return new_state
|
2023-05-01 21:30:20 +00:00
|
|
|
|
2023-03-17 03:05:36 +00:00
|
|
|
def cancel(self, graph_execution_state_id: str) -> None:
|
|
|
|
"""Cancels the given execution state"""
|
|
|
|
self.services.queue.cancel(graph_execution_state_id)
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
def __start_service(self, service) -> None:
|
|
|
|
# Call start() method on any services that have it
|
2023-03-03 06:02:00 +00:00
|
|
|
start_op = getattr(service, "start", None)
|
2022-12-01 05:33:20 +00:00
|
|
|
if callable(start_op):
|
|
|
|
start_op(self)
|
|
|
|
|
|
|
|
def __stop_service(self, service) -> None:
|
|
|
|
# Call stop() method on any services that have it
|
2023-03-03 06:02:00 +00:00
|
|
|
stop_op = getattr(service, "stop", None)
|
2022-12-01 05:33:20 +00:00
|
|
|
if callable(stop_op):
|
|
|
|
stop_op(self)
|
|
|
|
|
|
|
|
def _start(self) -> None:
|
|
|
|
"""Starts the invoker. This is called automatically when the invoker is created."""
|
2023-02-25 04:11:28 +00:00
|
|
|
for service in vars(self.services):
|
|
|
|
self.__start_service(getattr(self.services, service))
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
"""Stops the invoker. A new invoker will have to be created to execute further."""
|
|
|
|
# First stop all services
|
|
|
|
for service in vars(self.services):
|
|
|
|
self.__stop_service(getattr(self.services, service))
|
|
|
|
|
2023-02-25 04:11:28 +00:00
|
|
|
self.services.queue.put(None)
|
2022-12-01 05:33:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
class InvocationProcessorABC(ABC):
|
2023-03-03 06:02:00 +00:00
|
|
|
pass
|