diff --git a/Wabbajack.Common/WorkQueue.cs b/Wabbajack.Common/WorkQueue.cs index 5de69834..e9df2a6c 100644 --- a/Wabbajack.Common/WorkQueue.cs +++ b/Wabbajack.Common/WorkQueue.cs @@ -48,23 +48,40 @@ namespace Wabbajack.Common private readonly Subject> _activeNumThreadsObservable = new Subject>(); + /// + /// Creates a WorkQueue with the given number of threads + /// + /// Number of threads for the WorkQueue to have. Null represents default, which is the Processor count of the machine. public WorkQueue(int? threadCount = null) : this(Observable.Return(threadCount ?? Environment.ProcessorCount)) { } + /// + /// Creates a WorkQueue whos number of threads is determined by the given observable + /// + /// Driving observable that determines how many threads should be actively pulling jobs from the queue public WorkQueue(IObservable numThreads) { + // Hook onto the number of active threads subject, and subscribe to it for changes _activeNumThreadsObservable + // Select the latest driving observable .Select(x => x ?? Observable.Return(Environment.ProcessorCount)) .Switch() + .StartWith(Environment.ProcessorCount) .DistinctUntilChanged() + // Add new threads if it increases .SelectTask(AddNewThreadsIfNeeded) .Subscribe() .DisposeWith(_disposables); + // Set the incoming driving observable to be active SetActiveThreadsObservable(numThreads); } + /// + /// Sets the driving observable that determines how many threads should be actively pulling jobs from the queue + /// + /// Driving observable that determines how many threads should be actively pulling jobs from the queue public void SetActiveThreadsObservable(IObservable numThreads) { _activeNumThreadsObservable.OnNext(numThreads);