nodes-api: enforce single thread for the processor

On hyperthreaded CPUs we get two threads operating on the queue by
default on each core. This cases two threads to process queue items.
This results in pytorch errors and sometimes generates garbage.

Locking this to single thread makes sense because we are bound by the
number of GPUs in the system, not by CPU cores. And to parallelize
across GPUs we should just start multiple processors (and use async
instead of threading)

Fixes #3289
This commit is contained in:
Eugene 2023-05-01 11:13:16 -04:00 committed by Eugene Brodsky
parent 276dfc591b
commit d14a7d756e

View File

@ -1,5 +1,5 @@
import traceback import traceback
from threading import Event, Thread from threading import Event, Thread, BoundedSemaphore
from ..invocations.baseinvocation import InvocationContext from ..invocations.baseinvocation import InvocationContext
from .invocation_queue import InvocationQueueItem from .invocation_queue import InvocationQueueItem
@ -10,8 +10,11 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
__invoker_thread: Thread __invoker_thread: Thread
__stop_event: Event __stop_event: Event
__invoker: Invoker __invoker: Invoker
__threadLimit: BoundedSemaphore
def start(self, invoker) -> None: def start(self, invoker) -> None:
# if we do want multithreading at some point, we could make this configurable
self.__threadLimit = BoundedSemaphore(1)
self.__invoker = invoker self.__invoker = invoker
self.__stop_event = Event() self.__stop_event = Event()
self.__invoker_thread = Thread( self.__invoker_thread = Thread(
@ -20,7 +23,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
kwargs=dict(stop_event=self.__stop_event), kwargs=dict(stop_event=self.__stop_event),
) )
self.__invoker_thread.daemon = ( self.__invoker_thread.daemon = (
True # TODO: probably better to just not use threads? True # TODO: make async and do not use threads
) )
self.__invoker_thread.start() self.__invoker_thread.start()
@ -29,6 +32,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
def __process(self, stop_event: Event): def __process(self, stop_event: Event):
try: try:
self.__threadLimit.acquire()
while not stop_event.is_set(): while not stop_event.is_set():
queue_item: InvocationQueueItem = self.__invoker.services.queue.get() queue_item: InvocationQueueItem = self.__invoker.services.queue.get()
if not queue_item: # Probably stopping if not queue_item: # Probably stopping
@ -110,7 +114,7 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
) )
pass pass
# Check queue to see if this is canceled, and skip if so # Check queue to see if this is canceled, and skip if so
if self.__invoker.services.queue.is_canceled( if self.__invoker.services.queue.is_canceled(
graph_execution_state.id graph_execution_state.id
@ -127,4 +131,6 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
) )
except KeyboardInterrupt: except KeyboardInterrupt:
... # Log something? pass # Log something? KeyboardInterrupt is probably not going to be seen by the processor
finally:
self.__threadLimit.release()