diff --git a/Wabbajack.Common/WorkQueue.cs b/Wabbajack.Common/WorkQueue.cs index ef1eb526..6a24adbd 100644 --- a/Wabbajack.Common/WorkQueue.cs +++ b/Wabbajack.Common/WorkQueue.cs @@ -5,11 +5,13 @@ using System.Linq; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using DynamicData; using Wabbajack.Common.StatusFeed; +[assembly: InternalsVisibleTo("Wabbajack.Test")] namespace Wabbajack.Common { public class WorkQueue : IDisposable @@ -29,7 +31,7 @@ namespace Wabbajack.Common public IObservable Status => _Status; private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned" - private List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>(); + internal List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>(); public int DesiredNumWorkers { get; private set; } = 0; private CancellationTokenSource _shutdown = new CancellationTokenSource(); @@ -48,6 +50,8 @@ namespace Wabbajack.Common private readonly Subject> _activeNumThreadsObservable = new Subject>(); + internal const int PollMS = 200; + /// /// Creates a WorkQueue with the given number of threads /// @@ -68,7 +72,6 @@ namespace Wabbajack.Common // Select the latest driving observable .Select(x => x ?? Observable.Return(Environment.ProcessorCount)) .Switch() - .StartWith(Environment.ProcessorCount) .DistinctUntilChanged() // Add new threads if it increases .SelectTask(AddNewThreadsIfNeeded) @@ -116,17 +119,23 @@ namespace Wabbajack.Common { Report("Waiting", 0, false); if (_shutdown.IsCancellationRequested) return; + + Func f; + bool got; try { - f = Queue.Take(_shutdown.Token); + got = Queue.TryTake(out f, PollMS, _shutdown.Token); } catch (Exception) { throw new OperationCanceledException(); } - await f(); + if (got) + { + await f(); + } // Check if we're currently trimming threads if (DesiredNumWorkers >= _tasks.Count) continue;