wabbajack/Wabbajack.Common/WorkQueue.cs

202 lines
7.6 KiB
C#
Raw Normal View History

using System;
2019-07-22 22:17:46 +00:00
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
2019-07-22 22:17:46 +00:00
using System.Threading;
using System.Threading.Tasks;
using DynamicData;
2019-12-04 04:12:08 +00:00
using Wabbajack.Common.StatusFeed;
2019-07-22 22:17:46 +00:00
[assembly: InternalsVisibleTo("Wabbajack.Test")]
2019-07-22 22:17:46 +00:00
namespace Wabbajack.Common
{
public class WorkQueue : IDisposable
2019-07-22 22:17:46 +00:00
{
2020-03-29 03:29:27 +00:00
internal AsyncBlockingCollection<Func<Task>> Queue = new AsyncBlockingCollection<Func<Task>>();
2019-07-22 22:17:46 +00:00
public const int UnassignedCpuId = 0;
private static readonly AsyncLocal<int> _cpuId = new AsyncLocal<int>();
public int CpuId => _cpuId.Value;
2019-07-22 22:17:46 +00:00
public static bool WorkerThread => AsyncLocalCurrentQueue.Value != null;
public bool IsWorkerThread => WorkerThread;
internal static readonly AsyncLocal<WorkQueue?> AsyncLocalCurrentQueue = new AsyncLocal<WorkQueue?>();
2019-09-14 04:35:42 +00:00
2019-12-03 23:03:47 +00:00
private readonly Subject<CPUStatus> _Status = new Subject<CPUStatus>();
2019-11-17 04:16:42 +00:00
public IObservable<CPUStatus> Status => _Status;
2019-11-17 06:02:09 +00:00
private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned"
// Public for testing reasons
public Dictionary<int, Task> _tasks = new Dictionary<int, Task>();
public int DesiredNumWorkers { get; private set; } = 0;
2019-08-10 15:21:50 +00:00
private CancellationTokenSource _shutdown = new CancellationTokenSource();
private CompositeDisposable _disposables = new CompositeDisposable();
2019-12-04 00:03:43 +00:00
// This is currently a lie, as it wires to the Utils singleton stream This is still good to have,
// so that logic related to a single WorkQueue can subscribe to this dummy member so that If/when we
// implement log messages in a non-singleton fashion, they will already be wired up properly.
public IObservable<IStatusMessage> LogMessages => Utils.LogMessages;
private AsyncLock _lock = new AsyncLock();
2019-07-22 22:17:46 +00:00
private readonly BehaviorSubject<(int DesiredCPUs, int CurrentCPUs)> _cpuCountSubj = new BehaviorSubject<(int DesiredCPUs, int CurrentCPUs)>((0, 0));
public IObservable<(int CurrentCPUs, int DesiredCPUs)> CurrentCpuCount => _cpuCountSubj;
private readonly Subject<IObservable<int>?> _activeNumThreadsObservable = new Subject<IObservable<int>?>();
2020-03-29 03:29:27 +00:00
public static TimeSpan PollMS = TimeSpan.FromMilliseconds(200);
2020-01-11 20:41:44 +00:00
/// <summary>
/// Creates a WorkQueue with the given number of threads
/// </summary>
/// <param name="threadCount">Number of threads for the WorkQueue to have. Null represents default, which is the Processor count of the machine.</param>
2020-01-07 05:50:04 +00:00
public WorkQueue(int? threadCount = null)
: this(Observable.Return(threadCount ?? Environment.ProcessorCount))
{
}
2020-01-11 20:41:44 +00:00
/// <summary>
/// Creates a WorkQueue whos number of threads is determined by the given observable
/// </summary>
/// <param name="numThreads">Driving observable that determines how many threads should be actively pulling jobs from the queue</param>
public WorkQueue(IObservable<int>? numThreads)
2019-07-22 22:17:46 +00:00
{
2020-01-11 20:41:44 +00:00
// Hook onto the number of active threads subject, and subscribe to it for changes
_activeNumThreadsObservable
2020-01-11 20:41:44 +00:00
// Select the latest driving observable
.Select(x => x ?? Observable.Return(Environment.ProcessorCount))
.Switch()
.DistinctUntilChanged()
2020-01-11 20:41:44 +00:00
// Add new threads if it increases
.SelectTask(AddNewThreadsIfNeeded)
.Subscribe()
.DisposeWith(_disposables);
2020-01-11 20:41:44 +00:00
// Set the incoming driving observable to be active
SetActiveThreadsObservable(numThreads);
}
2020-01-11 20:41:44 +00:00
/// <summary>
/// Sets the driving observable that determines how many threads should be actively pulling jobs from the queue
/// </summary>
/// <param name="numThreads">Driving observable that determines how many threads should be actively pulling jobs from the queue</param>
public void SetActiveThreadsObservable(IObservable<int>? numThreads)
{
_activeNumThreadsObservable.OnNext(numThreads);
}
private async Task AddNewThreadsIfNeeded(int desired)
{
using (await _lock.WaitAsync())
{
DesiredNumWorkers = desired;
while (DesiredNumWorkers > _tasks.Count)
2019-09-14 04:35:42 +00:00
{
var cpuID = _nextCpuID++;
2020-01-12 00:36:45 +00:00
_tasks[cpuID] = Task.Run(async () =>
{
await ThreadBody(cpuID);
});
}
_cpuCountSubj.OnNext((_tasks.Count, DesiredNumWorkers));
}
2019-07-22 22:17:46 +00:00
}
private async Task ThreadBody(int cpuID)
2019-07-22 22:17:46 +00:00
{
_cpuId.Value = cpuID;
AsyncLocalCurrentQueue.Value = this;
2019-07-22 22:17:46 +00:00
2019-12-04 00:03:43 +00:00
try
{
while (true)
{
2020-02-08 04:35:08 +00:00
Report("Waiting", Percent.Zero, false);
if (_shutdown.IsCancellationRequested) return;
Func<Task> f;
bool got;
try
{
2020-03-29 03:29:27 +00:00
(got, f) = await Queue.TryTake(PollMS, _shutdown.Token);
}
catch (Exception)
{
throw new OperationCanceledException();
}
if (got)
{
await f();
}
// Check if we're currently trimming threads
if (DesiredNumWorkers >= _tasks.Count) continue;
// Noticed that we may need to shut down, lock and check again
using (await _lock.WaitAsync())
{
2020-01-11 20:59:40 +00:00
// Check if another thread shut down before this one and got us back to the desired amount already
if (DesiredNumWorkers >= _tasks.Count) continue;
2020-01-12 00:36:45 +00:00
// Shutdown
if (!_tasks.Remove(cpuID))
{
2020-01-12 00:36:45 +00:00
Utils.Error($"Could not remove thread from workpool with CPU ID {cpuID}");
}
2020-02-08 04:35:08 +00:00
Report("Shutting down", Percent.Zero, false);
2020-01-12 00:36:45 +00:00
_cpuCountSubj.OnNext((_tasks.Count, DesiredNumWorkers));
return;
}
2019-12-04 00:03:43 +00:00
}
}
catch (OperationCanceledException)
2019-07-22 22:17:46 +00:00
{
}
2020-01-08 00:46:14 +00:00
catch (Exception ex)
{
Utils.Error(ex, "Error in WorkQueue thread.");
}
2019-07-22 22:17:46 +00:00
}
2019-09-14 04:35:42 +00:00
2020-02-08 04:35:08 +00:00
public void Report(string msg, Percent progress, bool isWorking = true)
2019-07-22 22:17:46 +00:00
{
2019-11-17 04:16:42 +00:00
_Status.OnNext(
new CPUStatus
{
2020-02-08 04:35:08 +00:00
ProgressPercent = progress,
2019-11-17 04:16:42 +00:00
Msg = msg,
ID = _cpuId.Value,
IsWorking = isWorking
2019-11-17 04:16:42 +00:00
});
2019-07-22 22:17:46 +00:00
}
public void QueueTask(Func<Task> a)
2019-07-22 22:17:46 +00:00
{
Queue.Add(a);
}
public void Dispose()
2019-08-02 23:04:04 +00:00
{
_shutdown.Cancel();
_disposables.Dispose();
Queue?.Dispose();
}
2019-07-22 22:17:46 +00:00
}
public class CPUStatus
{
2020-02-08 04:35:08 +00:00
public Percent ProgressPercent { get; internal set; }
public string Msg { get; internal set; } = string.Empty;
public int ID { get; internal set; }
public bool IsWorking { get; internal set; }
}
}