Union[foo, None]=>Optional[foo]

This commit is contained in:
Lincoln Stein
2023-07-03 12:17:45 -04:00
parent ac9ec4e75a
commit ed86d0b708
26 changed files with 75 additions and 96 deletions

View File

@ -1,15 +1,11 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from abc import ABC
from threading import Event, Thread
from typing import Union
from typing import Optional
from ..invocations.baseinvocation import InvocationContext
from .graph import Graph, GraphExecutionState
from .invocation_queue import InvocationQueueABC, InvocationQueueItem
from .invocation_queue import InvocationQueueItem
from .invocation_services import InvocationServices
from .item_storage import ItemStorageABC
class Invoker:
"""The invoker, used to execute invocations"""
@ -22,7 +18,7 @@ class Invoker:
def invoke(
self, graph_execution_state: GraphExecutionState, invoke_all: bool = False
) -> Union[str, None]:
) -> Optional[str]:
"""Determines the next node to invoke and enqueues it, preparing if needed.
Returns the id of the queued node, or `None` if there are no nodes left to enqueue."""
@ -46,7 +42,7 @@ class Invoker:
return invocation.id
def create_execution_state(self, graph: Union[Graph, None] = None) -> GraphExecutionState:
def create_execution_state(self, graph: Optional[Graph] = None) -> GraphExecutionState:
"""Creates a new execution state for the given graph"""
new_state = GraphExecutionState(graph=Graph() if graph is None else graph)
self.services.graph_execution_manager.set(new_state)