wabbajack/Wabbajack.Common/WorkQueue.cs
2020-08-11 10:37:17 -05:00

201 lines
7.6 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.Common.StatusFeed;
[assembly: InternalsVisibleTo("Wabbajack.Test")]
namespace Wabbajack.Common
{
public class WorkQueue : IDisposable
{
internal AsyncBlockingCollection<Func<Task>> Queue = new AsyncBlockingCollection<Func<Task>>();
public const int UnassignedCpuId = 0;
private static readonly AsyncLocal<int> _cpuId = new AsyncLocal<int>();
public int CpuId => _cpuId.Value;
public static bool WorkerThread => AsyncLocalCurrentQueue.Value != null;
public bool IsWorkerThread => WorkerThread;
internal static readonly AsyncLocal<WorkQueue?> AsyncLocalCurrentQueue = new AsyncLocal<WorkQueue?>();
public static WorkQueue? AsyncLocalQueue => AsyncLocalCurrentQueue.Value;
private readonly Subject<CPUStatus> _Status = new Subject<CPUStatus>();
public IObservable<CPUStatus> Status => _Status;
private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned"
// Public for testing reasons
public Dictionary<int, Task> _tasks = new Dictionary<int, Task>();
public int DesiredNumWorkers { get; private set; } = 0;
private CancellationTokenSource _shutdown = new CancellationTokenSource();
private CompositeDisposable _disposables = new CompositeDisposable();
// This is currently a lie, as it wires to the Utils singleton stream This is still good to have,
// so that logic related to a single WorkQueue can subscribe to this dummy member so that If/when we
// implement log messages in a non-singleton fashion, they will already be wired up properly.
public IObservable<IStatusMessage> LogMessages => Utils.LogMessages;
private AsyncLock _lock = new AsyncLock();
private readonly BehaviorSubject<(int DesiredCPUs, int CurrentCPUs)> _cpuCountSubj = new BehaviorSubject<(int DesiredCPUs, int CurrentCPUs)>((0, 0));
public IObservable<(int CurrentCPUs, int DesiredCPUs)> CurrentCpuCount => _cpuCountSubj;
private readonly Subject<IObservable<int>?> _activeNumThreadsObservable = new Subject<IObservable<int>?>();
public static TimeSpan PollMS = TimeSpan.FromMilliseconds(200);
/// <summary>
/// Creates a WorkQueue with the given number of threads
/// </summary>
/// <param name="threadCount">Number of threads for the WorkQueue to have. Null represents default, which is the Processor count of the machine.</param>
public WorkQueue(int? threadCount = null)
: this(Observable.Return(threadCount ?? Environment.ProcessorCount))
{
}
/// <summary>
/// Creates a WorkQueue whos number of threads is determined by the given observable
/// </summary>
/// <param name="numThreads">Driving observable that determines how many threads should be actively pulling jobs from the queue</param>
public WorkQueue(IObservable<int>? numThreads)
{
// Hook onto the number of active threads subject, and subscribe to it for changes
_disposables.Add(_activeNumThreadsObservable
// Select the latest driving observable
.Select(x => x ?? Observable.Return(Environment.ProcessorCount))
.Switch()
.DistinctUntilChanged()
// Add new threads if it increases
.SelectTask(AddNewThreadsIfNeeded)
.Subscribe());
// Set the incoming driving observable to be active
SetActiveThreadsObservable(numThreads);
}
/// <summary>
/// Sets the driving observable that determines how many threads should be actively pulling jobs from the queue
/// </summary>
/// <param name="numThreads">Driving observable that determines how many threads should be actively pulling jobs from the queue</param>
public void SetActiveThreadsObservable(IObservable<int>? numThreads)
{
_activeNumThreadsObservable.OnNext(numThreads);
}
private async Task AddNewThreadsIfNeeded(int desired)
{
using (await _lock.WaitAsync())
{
DesiredNumWorkers = desired;
while (DesiredNumWorkers > _tasks.Count)
{
var cpuID = _nextCpuID++;
_tasks[cpuID] = Task.Run(async () =>
{
await ThreadBody(cpuID);
});
}
_cpuCountSubj.OnNext((_tasks.Count, DesiredNumWorkers));
}
}
private async Task ThreadBody(int cpuID)
{
_cpuId.Value = cpuID;
AsyncLocalCurrentQueue.Value = this;
try
{
while (true)
{
Report("Waiting", Percent.Zero, false);
if (_shutdown.IsCancellationRequested) return;
Func<Task> f;
bool got;
try
{
(got, f) = await Queue.TryTake(PollMS, _shutdown.Token);
}
catch (Exception)
{
throw new OperationCanceledException();
}
if (got)
{
await f();
}
// Check if we're currently trimming threads
if (DesiredNumWorkers >= _tasks.Count) continue;
// Noticed that we may need to shut down, lock and check again
using (await _lock.WaitAsync())
{
// Check if another thread shut down before this one and got us back to the desired amount already
if (DesiredNumWorkers >= _tasks.Count) continue;
// Shutdown
if (!_tasks.Remove(cpuID))
{
Utils.Error($"Could not remove thread from workpool with CPU ID {cpuID}");
}
Report("Shutting down", Percent.Zero, false);
_cpuCountSubj.OnNext((_tasks.Count, DesiredNumWorkers));
return;
}
}
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
Utils.Error(ex, "Error in WorkQueue thread.");
}
}
public void Report(string msg, Percent progress, bool isWorking = true)
{
_Status.OnNext(
new CPUStatus
{
ProgressPercent = progress,
Msg = msg,
ID = _cpuId.Value,
IsWorking = isWorking
});
}
public void QueueTask(Func<Task> a)
{
Queue.Add(a);
}
public void Dispose()
{
_shutdown.Cancel();
_disposables.Dispose();
Queue?.Dispose();
}
}
public class CPUStatus
{
public Percent ProgressPercent { get; internal set; }
public string Msg { get; internal set; } = string.Empty;
public int ID { get; internal set; }
public bool IsWorking { get; internal set; }
}
}