WorkQueue taking of queue swapped to TryTake

Helps shut down threads during quiet periods
This commit is contained in:
Justin Swanson
2020-01-11 17:17:45 -06:00
parent e2b2ea1727
commit 0106d020f9

View File

@ -5,11 +5,13 @@ using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DynamicData;
using Wabbajack.Common.StatusFeed;
[assembly: InternalsVisibleTo("Wabbajack.Test")]
namespace Wabbajack.Common
{
public class WorkQueue : IDisposable
@ -29,7 +31,7 @@ namespace Wabbajack.Common
public IObservable<CPUStatus> Status => _Status;
private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned"
private List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>();
internal List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>();
public int DesiredNumWorkers { get; private set; } = 0;
private CancellationTokenSource _shutdown = new CancellationTokenSource();
@ -48,6 +50,8 @@ namespace Wabbajack.Common
private readonly Subject<IObservable<int>> _activeNumThreadsObservable = new Subject<IObservable<int>>();
internal const int PollMS = 200;
/// <summary>
/// Creates a WorkQueue with the given number of threads
/// </summary>
@ -68,7 +72,6 @@ namespace Wabbajack.Common
// Select the latest driving observable
.Select(x => x ?? Observable.Return(Environment.ProcessorCount))
.Switch()
.StartWith(Environment.ProcessorCount)
.DistinctUntilChanged()
// Add new threads if it increases
.SelectTask(AddNewThreadsIfNeeded)
@ -116,17 +119,23 @@ namespace Wabbajack.Common
{
Report("Waiting", 0, false);
if (_shutdown.IsCancellationRequested) return;
Func<Task> f;
bool got;
try
{
f = Queue.Take(_shutdown.Token);
got = Queue.TryTake(out f, PollMS, _shutdown.Token);
}
catch (Exception)
{
throw new OperationCanceledException();
}
if (got)
{
await f();
}
// Check if we're currently trimming threads
if (DesiredNumWorkers >= _tasks.Count) continue;