diff --git a/Wabbajack.Common/Utils.cs b/Wabbajack.Common/Utils.cs index 377bf7e0..918230cf 100644 --- a/Wabbajack.Common/Utils.cs +++ b/Wabbajack.Common/Utils.cs @@ -90,6 +90,11 @@ namespace Wabbajack.Common return msg; } + public static void Error(string errMessage) + { + Log(errMessage); + } + public static void Error(Exception ex, string extraMessage = null) { Log(new GenericException(ex, extraMessage)); @@ -932,7 +937,7 @@ namespace Wabbajack.Common { var startTime = DateTime.Now; var seconds = 2; - var results = await Enumerable.Range(0, queue.ThreadCount) + var results = await Enumerable.Range(0, queue.DesiredNumWorkers) .PMap(queue, idx => { var random = new Random(); diff --git a/Wabbajack.Common/WorkQueue.cs b/Wabbajack.Common/WorkQueue.cs index 4552db0b..6f9891f9 100644 --- a/Wabbajack.Common/WorkQueue.cs +++ b/Wabbajack.Common/WorkQueue.cs @@ -1,11 +1,13 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; +using DynamicData; using Wabbajack.Common.StatusFeed; namespace Wabbajack.Common @@ -26,33 +28,56 @@ namespace Wabbajack.Common private readonly Subject _Status = new Subject(); public IObservable Status => _Status; - public List Tasks { get; private set; } + private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned" + private int _desiredCount = 0; + private List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>(); + public int DesiredNumWorkers => _desiredCount; - private CancellationTokenSource _cancel = new CancellationTokenSource(); + private CancellationTokenSource _shutdown = new CancellationTokenSource(); + + private CompositeDisposable _disposables = new CompositeDisposable(); // This is currently a lie, as it wires to the Utils singleton stream This is still good to have, // so that logic related to a single WorkQueue can subscribe to this dummy member so that If/when we // implement log messages in a non-singleton fashion, they will already be wired up properly. public IObservable LogMessages => Utils.LogMessages; - public int ThreadCount { get; private set; } + private AsyncLock _lock = new AsyncLock(); public WorkQueue(int? threadCount = null) + : this(Observable.Return(threadCount ?? Environment.ProcessorCount)) { - ThreadCount = threadCount ?? Environment.ProcessorCount; - Tasks = Enumerable.Range(1, ThreadCount) - .Select(idx => - { - return Task.Run(async () => - { - await ThreadBody(idx); - }); - }).ToList(); } - private async Task ThreadBody(int idx) + public WorkQueue(IObservable numThreads) { - _cpuId.Value = idx; + (numThreads ?? Observable.Return(Environment.ProcessorCount)) + .DistinctUntilChanged() + .SelectTask(AddNewThreadsIfNeeded) + .Subscribe() + .DisposeWith(_disposables); + } + + private async Task AddNewThreadsIfNeeded(int desired) + { + using (await _lock.Wait()) + { + _desiredCount = desired; + while (_desiredCount > _tasks.Count) + { + var cpuID = _nextCpuID++; + _tasks.Add((cpuID, + Task.Run(async () => + { + await ThreadBody(cpuID); + }))); + } + } + } + + private async Task ThreadBody(int cpuID) + { + _cpuId.Value = cpuID; AsyncLocalCurrentQueue.Value = this; try @@ -60,11 +85,11 @@ namespace Wabbajack.Common while (true) { Report("Waiting", 0, false); - if (_cancel.IsCancellationRequested) return; + if (_shutdown.IsCancellationRequested) return; Func f; try { - f = Queue.Take(_cancel.Token); + f = Queue.Take(_shutdown.Token); } catch (Exception) { @@ -72,6 +97,32 @@ namespace Wabbajack.Common } await f(); + + // Check if we're currently trimming threads + if (_desiredCount >= _tasks.Count) continue; + + // Noticed that we may need to shut down, lock and check again + using (await _lock.Wait()) + { + // Check if another thread shut down before this one and got us in line + if (_desiredCount >= _tasks.Count) continue; + + Report("Shutting down", 0, false); + // Remove this task from list + for (int i = 0; i < _tasks.Count; i++) + { + if (_tasks[i].CpuID == cpuID) + { + _tasks.RemoveAt(i); + // Shutdown thread + Report("Shutting down", 0, false); + return; + } + } + // Failed to remove, warn and then shutdown anyway + Utils.Error($"Could not remove thread from workpool with CPU ID {cpuID}"); + return; + } } } catch (OperationCanceledException) @@ -103,7 +154,8 @@ namespace Wabbajack.Common public void Dispose() { - _cancel.Cancel(); + _shutdown.Cancel(); + _disposables.Dispose(); Queue?.Dispose(); } } diff --git a/Wabbajack.Lib/ABatchProcessor.cs b/Wabbajack.Lib/ABatchProcessor.cs index ad4347fc..0d3df2c7 100644 --- a/Wabbajack.Lib/ABatchProcessor.cs +++ b/Wabbajack.Lib/ABatchProcessor.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.Reactive.Disposables; +using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; @@ -48,17 +49,17 @@ namespace Wabbajack.Lib private readonly CompositeDisposable _subs = new CompositeDisposable(); // WorkQueue settings - public bool ManualCoreLimit = true; - public byte MaxCores = byte.MaxValue; - public double TargetUsagePercent = 1.0d; + public BehaviorSubject ManualCoreLimit = new BehaviorSubject(true); + public BehaviorSubject MaxCores = new BehaviorSubject(byte.MaxValue); + public BehaviorSubject TargetUsagePercent = new BehaviorSubject(1.0d); - protected void ConfigureProcessor(int steps, int threads = 0) + protected void ConfigureProcessor(int steps, IObservable numThreads = null) { if (1 == Interlocked.CompareExchange(ref _configured, 1, 1)) { throw new InvalidDataException("Can't configure a processor twice"); } - Queue = new WorkQueue(threads); + Queue = new WorkQueue(numThreads); UpdateTracker = new StatusUpdateTracker(steps); Queue.Status.Subscribe(_queueStatus) .DisposeWith(_subs); @@ -79,23 +80,34 @@ namespace Wabbajack.Lib var scratch_size = await RecommendQueueSize(Directory.GetCurrentDirectory()); var result = Math.Min((int)based_on_memory, (int)scratch_size); Utils.Log($"Recommending a queue size of {result} based on disk performance, number of cores, and {((long)memory.ullTotalPhys).ToFileSizeString()} of system RAM"); - if (ManualCoreLimit) - { - if (result > MaxCores) - { - Utils.Log($"Only using {MaxCores} due to user preferences."); - } - result = MaxCores; - } - else if (TargetUsagePercent < 1.0d && TargetUsagePercent > 0d) - { - result = (int)Math.Ceiling(result * TargetUsagePercent); - result = Math.Max(1, result); - Utils.Log($"Only using {result} due to user scaling preferences of {(TargetUsagePercent * 100)}%."); - } return result; } + public IObservable ConstructDynamicNumThreads(int recommendedCount) + { + return Observable.CombineLatest( + ManualCoreLimit, + MaxCores, + TargetUsagePercent, + resultSelector: (manual, max, target) => + { + if (manual) + { + if (recommendedCount > max) + { + Utils.Log($"Only using {max} due to user preferences."); + } + return Math.Min(max, recommendedCount); + } + else if (target < 1.0d && target > 0d) + { + var ret = (int)Math.Ceiling(recommendedCount * target); + return Math.Max(1, ret); + } + return recommendedCount; + }); + } + public static async Task RecommendQueueSize(string folder) { if (!Directory.Exists(folder)) @@ -141,5 +153,7 @@ namespace Wabbajack.Lib Queue?.Dispose(); _isRunning.OnNext(false); } + + public void Add(IDisposable disposable) => _subs.Add(disposable); } } diff --git a/Wabbajack.Lib/MO2Compiler.cs b/Wabbajack.Lib/MO2Compiler.cs index c3f2a80f..fe3dceee 100644 --- a/Wabbajack.Lib/MO2Compiler.cs +++ b/Wabbajack.Lib/MO2Compiler.cs @@ -82,7 +82,7 @@ namespace Wabbajack.Lib protected override async Task _Begin(CancellationToken cancel) { if (cancel.IsCancellationRequested) return false; - ConfigureProcessor(19); + ConfigureProcessor(19, ConstructDynamicNumThreads(await RecommendQueueSize())); UpdateTracker.Reset(); UpdateTracker.NextStep("Gathering information"); Info("Looking for other profiles"); diff --git a/Wabbajack.Lib/MO2Installer.cs b/Wabbajack.Lib/MO2Installer.cs index 8827739f..94e662ad 100644 --- a/Wabbajack.Lib/MO2Installer.cs +++ b/Wabbajack.Lib/MO2Installer.cs @@ -45,7 +45,7 @@ namespace Wabbajack.Lib if (cancel.IsCancellationRequested) return false; var metric = Metrics.Send("begin_install", ModList.Name); - ConfigureProcessor(19, await RecommendQueueSize()); + ConfigureProcessor(19, ConstructDynamicNumThreads(await RecommendQueueSize())); var game = ModList.GameType.MetaData(); if (GameFolder == null) diff --git a/Wabbajack.Lib/VortexCompiler.cs b/Wabbajack.Lib/VortexCompiler.cs index adb0bb49..846204b3 100644 --- a/Wabbajack.Lib/VortexCompiler.cs +++ b/Wabbajack.Lib/VortexCompiler.cs @@ -85,7 +85,7 @@ namespace Wabbajack.Lib Info($"Starting Vortex compilation for {GameName} at {GamePath} with staging folder at {StagingFolder} and downloads folder at {DownloadsFolder}."); - ConfigureProcessor(12); + ConfigureProcessor(12, ConstructDynamicNumThreads(await RecommendQueueSize())); UpdateTracker.Reset(); if (cancel.IsCancellationRequested) return false; diff --git a/Wabbajack.Lib/VortexInstaller.cs b/Wabbajack.Lib/VortexInstaller.cs index 2d86df47..2397515e 100644 --- a/Wabbajack.Lib/VortexInstaller.cs +++ b/Wabbajack.Lib/VortexInstaller.cs @@ -51,7 +51,7 @@ namespace Wabbajack.Lib } if (cancel.IsCancellationRequested) return false; - ConfigureProcessor(10, await RecommendQueueSize()); + ConfigureProcessor(10, ConstructDynamicNumThreads(await RecommendQueueSize())); Directory.CreateDirectory(DownloadFolder); if (cancel.IsCancellationRequested) return false; diff --git a/Wabbajack/Settings.cs b/Wabbajack/Settings.cs index e7f3757f..14cba302 100644 --- a/Wabbajack/Settings.cs +++ b/Wabbajack/Settings.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.IO; using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Subjects; using Wabbajack.Common; using Wabbajack.Lib; @@ -83,6 +84,19 @@ namespace Wabbajack private double _TargetUsage = 1.0d; public double TargetUsage { get => _TargetUsage; set => this.RaiseAndSetIfChanged(ref _TargetUsage, value); } + + public void AttachToBatchProcessor(ABatchProcessor processor) + { + processor.Add( + this.WhenAny(x => x.Manual) + .Subscribe(processor.ManualCoreLimit)); + processor.Add( + this.WhenAny(x => x.MaxCores) + .Subscribe(processor.MaxCores)); + processor.Add( + this.WhenAny(x => x.TargetUsage) + .Subscribe(processor.TargetUsagePercent)); + } } public class CompilationModlistSettings diff --git a/Wabbajack/View Models/Compilers/MO2CompilerVM.cs b/Wabbajack/View Models/Compilers/MO2CompilerVM.cs index dff1bf85..aeea2067 100644 --- a/Wabbajack/View Models/Compilers/MO2CompilerVM.cs +++ b/Wabbajack/View Models/Compilers/MO2CompilerVM.cs @@ -180,7 +180,7 @@ namespace Wabbajack try { - ActiveCompilation = new MO2Compiler( + using (ActiveCompilation = new MO2Compiler( mo2Folder: Mo2Folder, mo2Profile: MOProfile, outputFile: outputFile) @@ -192,16 +192,16 @@ namespace Wabbajack ModListWebsite = ModlistSettings.Website, ModListReadme = ModlistSettings.ReadmeIsWebsite ? ModlistSettings.ReadmeWebsite : ModlistSettings.ReadmeFilePath.TargetPath, ReadmeIsWebsite = ModlistSettings.ReadmeIsWebsite, - ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual, - MaxCores = Parent.MWVM.Settings.Performance.MaxCores, - TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage, - }; - await ActiveCompilation.Begin(); + }) + { + Parent.MWVM.Settings.Performance.AttachToBatchProcessor(ActiveCompilation); + + await ActiveCompilation.Begin(); + } } finally { StatusTracker = null; - ActiveCompilation.Dispose(); ActiveCompilation = null; } } diff --git a/Wabbajack/View Models/Compilers/VortexCompilerVM.cs b/Wabbajack/View Models/Compilers/VortexCompilerVM.cs index 57266209..8dd689d5 100644 --- a/Wabbajack/View Models/Compilers/VortexCompilerVM.cs +++ b/Wabbajack/View Models/Compilers/VortexCompilerVM.cs @@ -189,7 +189,7 @@ namespace Wabbajack } try { - ActiveCompilation = new VortexCompiler( + using (ActiveCompilation = new VortexCompiler( game: SelectedGame.Game, gamePath: GameLocation.TargetPath, vortexFolder: VortexCompiler.TypicalVortexFolder(), @@ -204,11 +204,11 @@ namespace Wabbajack ModListWebsite = ModlistSettings.Website, ModListReadme = ModlistSettings.ReadmeIsWebsite ? ModlistSettings.ReadmeWebsite : ModlistSettings.ReadmeFilePath.TargetPath, ReadmeIsWebsite = ModlistSettings.ReadmeIsWebsite, - ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual, - MaxCores = Parent.MWVM.Settings.Performance.MaxCores, - TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage, - }; - await ActiveCompilation.Begin(); + }) + { + Parent.MWVM.Settings.Performance.AttachToBatchProcessor(ActiveCompilation); + await ActiveCompilation.Begin(); + } } finally { diff --git a/Wabbajack/View Models/Installers/MO2InstallerVM.cs b/Wabbajack/View Models/Installers/MO2InstallerVM.cs index f5c92e7f..6a796256 100644 --- a/Wabbajack/View Models/Installers/MO2InstallerVM.cs +++ b/Wabbajack/View Models/Installers/MO2InstallerVM.cs @@ -146,32 +146,30 @@ namespace Wabbajack public async Task Install() { - var installer = new MO2Installer( + using (var installer = new MO2Installer( archive: Parent.ModListLocation.TargetPath, modList: Parent.ModList.SourceModList, outputFolder: Location.TargetPath, downloadFolder: DownloadLocation.TargetPath, - parameters: SystemParametersConstructor.Create()) + parameters: SystemParametersConstructor.Create())) { - ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual, - MaxCores = Parent.MWVM.Settings.Performance.MaxCores, - TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage, - }; + Parent.MWVM.Settings.Performance.AttachToBatchProcessor(installer); - await Task.Run(async () => - { - try + await Task.Run(async () => { - var workTask = installer.Begin(); - ActiveInstallation = installer; - await workTask; - return ErrorResponse.Success; - } - finally - { - ActiveInstallation = null; - } - }); + try + { + var workTask = installer.Begin(); + ActiveInstallation = installer; + await workTask; + return ErrorResponse.Success; + } + finally + { + ActiveInstallation = null; + } + }); + } } } } diff --git a/Wabbajack/View Models/Installers/VortexInstallerVM.cs b/Wabbajack/View Models/Installers/VortexInstallerVM.cs index 182cbd7b..f4606f07 100644 --- a/Wabbajack/View Models/Installers/VortexInstallerVM.cs +++ b/Wabbajack/View Models/Installers/VortexInstallerVM.cs @@ -63,31 +63,29 @@ namespace Wabbajack var download = VortexCompiler.RetrieveDownloadLocation(TargetGame); var staging = VortexCompiler.RetrieveStagingLocation(TargetGame); - installer = new VortexInstaller( + using (installer = new VortexInstaller( archive: Parent.ModListLocation.TargetPath, modList: Parent.ModList.SourceModList, outputFolder: staging, downloadFolder: download, - parameters: SystemParametersConstructor.Create()) + parameters: SystemParametersConstructor.Create())) { - ManualCoreLimit = Parent.MWVM.Settings.Performance.Manual, - MaxCores = Parent.MWVM.Settings.Performance.MaxCores, - TargetUsagePercent = Parent.MWVM.Settings.Performance.TargetUsage, - }; + Parent.MWVM.Settings.Performance.AttachToBatchProcessor(installer); - await Task.Run(async () => - { - try + await Task.Run(async () => { - var workTask = installer.Begin(); - ActiveInstallation = installer; - await workTask; - } - finally - { - ActiveInstallation = null; - } - }); + try + { + var workTask = installer.Begin(); + ActiveInstallation = installer; + await workTask; + } + finally + { + ActiveInstallation = null; + } + }); + } } } }