Initial dynamic WorkQueue threads system

This commit is contained in:
Justin Swanson 2020-01-09 22:55:57 -06:00
parent 265953499d
commit 818d92320e
12 changed files with 173 additions and 92 deletions

View File

@ -90,6 +90,11 @@ namespace Wabbajack.Common
return msg;
}
public static void Error(string errMessage)
{
Log(errMessage);
}
public static void Error(Exception ex, string extraMessage = null)
{
Log(new GenericException(ex, extraMessage));
@ -932,7 +937,7 @@ namespace Wabbajack.Common
{
var startTime = DateTime.Now;
var seconds = 2;
var results = await Enumerable.Range(0, queue.ThreadCount)
var results = await Enumerable.Range(0, queue.DesiredNumWorkers)
.PMap(queue, idx =>
{
var random = new Random();

View File

@ -1,11 +1,13 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using DynamicData;
using Wabbajack.Common.StatusFeed;
namespace Wabbajack.Common
@ -26,33 +28,56 @@ namespace Wabbajack.Common
private readonly Subject<CPUStatus> _Status = new Subject<CPUStatus>();
public IObservable<CPUStatus> Status => _Status;
public List<Task> Tasks { get; private set; }
private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned"
private int _desiredCount = 0;
private List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>();
public int DesiredNumWorkers => _desiredCount;
private CancellationTokenSource _cancel = new CancellationTokenSource();
private CancellationTokenSource _shutdown = new CancellationTokenSource();
private CompositeDisposable _disposables = new CompositeDisposable();
// This is currently a lie, as it wires to the Utils singleton stream This is still good to have,
// so that logic related to a single WorkQueue can subscribe to this dummy member so that If/when we
// implement log messages in a non-singleton fashion, they will already be wired up properly.
public IObservable<IStatusMessage> LogMessages => Utils.LogMessages;
public int ThreadCount { get; private set; }
private AsyncLock _lock = new AsyncLock();
public WorkQueue(int? threadCount = null)
: this(Observable.Return(threadCount ?? Environment.ProcessorCount))
{
ThreadCount = threadCount ?? Environment.ProcessorCount;
Tasks = Enumerable.Range(1, ThreadCount)
.Select(idx =>
{
return Task.Run(async () =>
{
await ThreadBody(idx);
});
}).ToList();
}
private async Task ThreadBody(int idx)
public WorkQueue(IObservable<int> numThreads)
{
_cpuId.Value = idx;
(numThreads ?? Observable.Return(Environment.ProcessorCount))
.DistinctUntilChanged()
.SelectTask(AddNewThreadsIfNeeded)
.Subscribe()
.DisposeWith(_disposables);
}
private async Task AddNewThreadsIfNeeded(int desired)
{
using (await _lock.Wait())
{
_desiredCount = desired;
while (_desiredCount > _tasks.Count)
{
var cpuID = _nextCpuID++;
_tasks.Add((cpuID,
Task.Run(async () =>
{
await ThreadBody(cpuID);
})));
}
}
}
private async Task ThreadBody(int cpuID)
{
_cpuId.Value = cpuID;
AsyncLocalCurrentQueue.Value = this;
try
@ -60,11 +85,11 @@ namespace Wabbajack.Common
while (true)
{
Report("Waiting", 0, false);
if (_cancel.IsCancellationRequested) return;
if (_shutdown.IsCancellationRequested) return;
Func<Task> f;
try
{
f = Queue.Take(_cancel.Token);
f = Queue.Take(_shutdown.Token);
}
catch (Exception)
{
@ -72,6 +97,32 @@ namespace Wabbajack.Common
}
await f();
// Check if we're currently trimming threads
if (_desiredCount >= _tasks.Count) continue;
// Noticed that we may need to shut down, lock and check again
using (await _lock.Wait())
{
// Check if another thread shut down before this one and got us in line
if (_desiredCount >= _tasks.Count) continue;
Report("Shutting down", 0, false);
// Remove this task from list
for (int i = 0; i < _tasks.Count; i++)
{
if (_tasks[i].CpuID == cpuID)
{
_tasks.RemoveAt(i);
// Shutdown thread
Report("Shutting down", 0, false);
return;
}
}
// Failed to remove, warn and then shutdown anyway
Utils.Error($"Could not remove thread from workpool with CPU ID {cpuID}");
return;
}
}
}
catch (OperationCanceledException)
@ -103,7 +154,8 @@ namespace Wabbajack.Common
public void Dispose()
{
_cancel.Cancel();
_shutdown.Cancel();
_disposables.Dispose();
Queue?.Dispose();
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
@ -48,17 +49,17 @@ namespace Wabbajack.Lib
private readonly CompositeDisposable _subs = new CompositeDisposable();
// WorkQueue settings
public bool ManualCoreLimit = true;
public byte MaxCores = byte.MaxValue;
public double TargetUsagePercent = 1.0d;
public BehaviorSubject<bool> ManualCoreLimit = new BehaviorSubject<bool>(true);
public BehaviorSubject<byte> MaxCores = new BehaviorSubject<byte>(byte.MaxValue);
public BehaviorSubject<double> TargetUsagePercent = new BehaviorSubject<double>(1.0d);
protected void ConfigureProcessor(int steps, int threads = 0)
protected void ConfigureProcessor(int steps, IObservable<int> numThreads = null)
{
if (1 == Interlocked.CompareExchange(ref _configured, 1, 1))
{
throw new InvalidDataException("Can't configure a processor twice");
}
Queue = new WorkQueue(threads);
Queue = new WorkQueue(numThreads);
UpdateTracker = new StatusUpdateTracker(steps);
Queue.Status.Subscribe(_queueStatus)
.DisposeWith(_subs);
@ -79,23 +80,34 @@ namespace Wabbajack.Lib
var scratch_size = await RecommendQueueSize(Directory.GetCurrentDirectory());
var result = Math.Min((int)based_on_memory, (int)scratch_size);
Utils.Log($"Recommending a queue size of {result} based on disk performance, number of cores, and {((long)memory.ullTotalPhys).ToFileSizeString()} of system RAM");
if (ManualCoreLimit)
{
if (result > MaxCores)
{
Utils.Log($"Only using {MaxCores} due to user preferences.");
}
result = MaxCores;
}
else if (TargetUsagePercent < 1.0d && TargetUsagePercent > 0d)
{
result = (int)Math.Ceiling(result * TargetUsagePercent);
result = Math.Max(1, result);
Utils.Log($"Only using {result} due to user scaling preferences of {(TargetUsagePercent * 100)}%.");
}
return result;
}
public IObservable<int> ConstructDynamicNumThreads(int recommendedCount)
{
return Observable.CombineLatest(
ManualCoreLimit,
MaxCores,
TargetUsagePercent,
resultSelector: (manual, max, target) =>
{
if (manual)
{
if (recommendedCount > max)
{
Utils.Log($"Only using {max} due to user preferences.");
}
return Math.Min(max, recommendedCount);
}
else if (target < 1.0d && target > 0d)
{
var ret = (int)Math.Ceiling(recommendedCount * target);
return Math.Max(1, ret);
}
return recommendedCount;
});
}
public static async Task<int> RecommendQueueSize(string folder)
{
if (!Directory.Exists(folder))
@ -141,5 +153,7 @@ namespace Wabbajack.Lib
Queue?.Dispose();
_isRunning.OnNext(false);
}
public void Add(IDisposable disposable) => _subs.Add(disposable);
}
}

View File

@ -82,7 +82,7 @@ namespace Wabbajack.Lib
protected override async Task<bool> _Begin(CancellationToken cancel)
{
if (cancel.IsCancellationRequested) return false;
ConfigureProcessor(19);
ConfigureProcessor(19, ConstructDynamicNumThreads(await RecommendQueueSize()));
UpdateTracker.Reset();
UpdateTracker.NextStep("Gathering information");
Info("Looking for other profiles");

View File

@ -45,7 +45,7 @@ namespace Wabbajack.Lib
if (cancel.IsCancellationRequested) return false;
var metric = Metrics.Send("begin_install", ModList.Name);
ConfigureProcessor(19, await RecommendQueueSize());
ConfigureProcessor(19, ConstructDynamicNumThreads(await RecommendQueueSize()));
var game = ModList.GameType.MetaData();
if (GameFolder == null)

View File

@ -85,7 +85,7 @@ namespace Wabbajack.Lib
Info($"Starting Vortex compilation for {GameName} at {GamePath} with staging folder at {StagingFolder} and downloads folder at {DownloadsFolder}.");
ConfigureProcessor(12);
ConfigureProcessor(12, ConstructDynamicNumThreads(await RecommendQueueSize()));
UpdateTracker.Reset();
if (cancel.IsCancellationRequested) return false;

View File

@ -51,7 +51,7 @@ namespace Wabbajack.Lib
}
if (cancel.IsCancellationRequested) return false;
ConfigureProcessor(10, await RecommendQueueSize());
ConfigureProcessor(10, ConstructDynamicNumThreads(await RecommendQueueSize()));
Directory.CreateDirectory(DownloadFolder);
if (cancel.IsCancellationRequested) return false;

View File

@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using Wabbajack.Common;
using Wabbajack.Lib;
@ -83,6 +84,19 @@ namespace Wabbajack
private double _TargetUsage = 1.0d;
public double TargetUsage { get => _TargetUsage; set => this.RaiseAndSetIfChanged(ref _TargetUsage, value); }
public void AttachToBatchProcessor(ABatchProcessor processor)
{
processor.Add(
this.WhenAny(x => x.Manual)
.Subscribe(processor.ManualCoreLimit));
processor.Add(
this.WhenAny(x => x.MaxCores)
.Subscribe(processor.MaxCores));
processor.Add(
this.WhenAny(x => x.TargetUsage)
.Subscribe(processor.TargetUsagePercent));
}
}
public class CompilationModlistSettings

View File

@ -180,7 +180,7 @@ namespace Wabbajack
try
{
ActiveCompilation = new MO2Compiler(
using (ActiveCompilation = new MO2Compiler(
mo2Folder: Mo2Folder,
mo2Profile: MOProfile,
outputFile: outputFile)
@ -192,16 +192,16 @@ namespace Wabbajack
ModListWebsite = ModlistSettings.Website,
ModListReadme = ModlistSettings.ReadmeIsWebsite ? ModlistSettings.ReadmeWebsite : ModlistSettings.ReadmeFilePath.TargetPath,
ReadmeIsWebsite = ModlistSettings.ReadmeIsWebsite,
ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual,
MaxCores = Parent.MWVM.Settings.Performance.MaxCores,
TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage,
};
await ActiveCompilation.Begin();
})
{
Parent.MWVM.Settings.Performance.AttachToBatchProcessor(ActiveCompilation);
await ActiveCompilation.Begin();
}
}
finally
{
StatusTracker = null;
ActiveCompilation.Dispose();
ActiveCompilation = null;
}
}

View File

@ -189,7 +189,7 @@ namespace Wabbajack
}
try
{
ActiveCompilation = new VortexCompiler(
using (ActiveCompilation = new VortexCompiler(
game: SelectedGame.Game,
gamePath: GameLocation.TargetPath,
vortexFolder: VortexCompiler.TypicalVortexFolder(),
@ -204,11 +204,11 @@ namespace Wabbajack
ModListWebsite = ModlistSettings.Website,
ModListReadme = ModlistSettings.ReadmeIsWebsite ? ModlistSettings.ReadmeWebsite : ModlistSettings.ReadmeFilePath.TargetPath,
ReadmeIsWebsite = ModlistSettings.ReadmeIsWebsite,
ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual,
MaxCores = Parent.MWVM.Settings.Performance.MaxCores,
TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage,
};
await ActiveCompilation.Begin();
})
{
Parent.MWVM.Settings.Performance.AttachToBatchProcessor(ActiveCompilation);
await ActiveCompilation.Begin();
}
}
finally
{

View File

@ -146,32 +146,30 @@ namespace Wabbajack
public async Task Install()
{
var installer = new MO2Installer(
using (var installer = new MO2Installer(
archive: Parent.ModListLocation.TargetPath,
modList: Parent.ModList.SourceModList,
outputFolder: Location.TargetPath,
downloadFolder: DownloadLocation.TargetPath,
parameters: SystemParametersConstructor.Create())
parameters: SystemParametersConstructor.Create()))
{
ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual,
MaxCores = Parent.MWVM.Settings.Performance.MaxCores,
TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage,
};
Parent.MWVM.Settings.Performance.AttachToBatchProcessor(installer);
await Task.Run(async () =>
{
try
await Task.Run(async () =>
{
var workTask = installer.Begin();
ActiveInstallation = installer;
await workTask;
return ErrorResponse.Success;
}
finally
{
ActiveInstallation = null;
}
});
try
{
var workTask = installer.Begin();
ActiveInstallation = installer;
await workTask;
return ErrorResponse.Success;
}
finally
{
ActiveInstallation = null;
}
});
}
}
}
}

View File

@ -63,31 +63,29 @@ namespace Wabbajack
var download = VortexCompiler.RetrieveDownloadLocation(TargetGame);
var staging = VortexCompiler.RetrieveStagingLocation(TargetGame);
installer = new VortexInstaller(
using (installer = new VortexInstaller(
archive: Parent.ModListLocation.TargetPath,
modList: Parent.ModList.SourceModList,
outputFolder: staging,
downloadFolder: download,
parameters: SystemParametersConstructor.Create())
parameters: SystemParametersConstructor.Create()))
{
ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual,
MaxCores = Parent.MWVM.Settings.Performance.MaxCores,
TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage,
};
Parent.MWVM.Settings.Performance.AttachToBatchProcessor(installer);
await Task.Run(async () =>
{
try
await Task.Run(async () =>
{
var workTask = installer.Begin();
ActiveInstallation = installer;
await workTask;
}
finally
{
ActiveInstallation = null;
}
});
try
{
var workTask = installer.Begin();
ActiveInstallation = installer;
await workTask;
}
finally
{
ActiveInstallation = null;
}
});
}
}
}
}