WorkQueue's desired threads obs can be set after ctor. CurrentCpuCount display

This commit is contained in:
Justin Swanson 2020-01-11 14:20:14 -06:00
parent ddbd1ef754
commit df20f65f90
7 changed files with 66 additions and 11 deletions

View File

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@ -29,9 +29,8 @@ namespace Wabbajack.Common
public IObservable<CPUStatus> Status => _Status;
private int _nextCpuID = 1; // Start at 1, as 0 is "Unassigned"
private int _desiredCount = 0;
private List<(int CpuID, Task Task)> _tasks = new List<(int CpuID, Task Task)>();
public int DesiredNumWorkers => _desiredCount;
public int DesiredNumWorkers { get; private set; } = 0;
private CancellationTokenSource _shutdown = new CancellationTokenSource();
@ -44,6 +43,11 @@ namespace Wabbajack.Common
private AsyncLock _lock = new AsyncLock();
private readonly BehaviorSubject<(int DesiredCPUs, int CurrentCPUs)> _cpuCountSubj = new BehaviorSubject<(int DesiredCPUs, int CurrentCPUs)>((0, 0));
public IObservable<(int CurrentCPUs, int DesiredCPUs)> CurrentCpuCount => _cpuCountSubj;
private readonly Subject<IObservable<int>> _activeNumThreadsObservable = new Subject<IObservable<int>>();
public WorkQueue(int? threadCount = null)
: this(Observable.Return(threadCount ?? Environment.ProcessorCount))
{
@ -51,19 +55,27 @@ namespace Wabbajack.Common
public WorkQueue(IObservable<int> numThreads)
{
(numThreads ?? Observable.Return(Environment.ProcessorCount))
_activeNumThreadsObservable
.Select(x => x ?? Observable.Return(Environment.ProcessorCount))
.Switch()
.DistinctUntilChanged()
.SelectTask(AddNewThreadsIfNeeded)
.Subscribe()
.DisposeWith(_disposables);
SetActiveThreadsObservable(numThreads);
}
public void SetActiveThreadsObservable(IObservable<int> numThreads)
{
_activeNumThreadsObservable.OnNext(numThreads);
}
private async Task AddNewThreadsIfNeeded(int desired)
{
using (await _lock.Wait())
{
_desiredCount = desired;
while (_desiredCount > _tasks.Count)
DesiredNumWorkers = desired;
while (DesiredNumWorkers > _tasks.Count)
{
var cpuID = _nextCpuID++;
_tasks.Add((cpuID,
@ -72,6 +84,7 @@ namespace Wabbajack.Common
await ThreadBody(cpuID);
})));
}
_cpuCountSubj.OnNext((_tasks.Count, DesiredNumWorkers));
}
}
@ -99,13 +112,13 @@ namespace Wabbajack.Common
await f();
// Check if we're currently trimming threads
if (_desiredCount >= _tasks.Count) continue;
if (DesiredNumWorkers >= _tasks.Count) continue;
// Noticed that we may need to shut down, lock and check again
using (await _lock.Wait())
{
// Check if another thread shut down before this one and got us in line
if (_desiredCount >= _tasks.Count) continue;
if (DesiredNumWorkers >= _tasks.Count) continue;
Report("Shutting down", 0, false);
// Remove this task from list
@ -114,6 +127,7 @@ namespace Wabbajack.Common
if (_tasks[i].CpuID == cpuID)
{
_tasks.RemoveAt(i);
_cpuCountSubj.OnNext((_tasks.Count, DesiredNumWorkers));
// Shutdown thread
Report("Shutting down", 0, false);
return;

View File

@ -13,7 +13,7 @@ namespace Wabbajack.Lib
{
public abstract class ABatchProcessor : IBatchProcessor
{
public WorkQueue Queue { get; private set; }
public WorkQueue Queue { get; } = new WorkQueue();
public Context VFS { get; private set; }
@ -59,7 +59,7 @@ namespace Wabbajack.Lib
{
throw new InvalidDataException("Can't configure a processor twice");
}
Queue = new WorkQueue(numThreads);
Queue.SetActiveThreadsObservable(numThreads);
UpdateTracker = new StatusUpdateTracker(steps);
Queue.Status.Subscribe(_queueStatus)
.DisposeWith(_subs);

View File

@ -67,6 +67,9 @@ namespace Wabbajack
private readonly ObservableAsPropertyHelper<string> _progressTitle;
public string ProgressTitle => _progressTitle.Value;
private readonly ObservableAsPropertyHelper<(int CurrentCPUs, int DesiredCPUs)> _CurrentCpuCount;
public (int CurrentCPUs, int DesiredCPUs) CurrentCpuCount => _CurrentCpuCount.Value;
public CompilerVM(MainWindowVM mainWindowVM)
{
MWVM = mainWindowVM;
@ -256,6 +259,11 @@ namespace Wabbajack
}
})
.ToProperty(this, nameof(ProgressTitle));
_CurrentCpuCount = this.WhenAny(x => x.Compiler.ActiveCompilation.Queue.CurrentCpuCount)
.Switch()
.ObserveOnGuiThread()
.ToProperty(this, nameof(CurrentCpuCount));
}
}
}

View File

@ -87,6 +87,9 @@ namespace Wabbajack
private readonly ObservableAsPropertyHelper<IUserIntervention> _ActiveGlobalUserIntervention;
public IUserIntervention ActiveGlobalUserIntervention => _ActiveGlobalUserIntervention.Value;
private readonly ObservableAsPropertyHelper<(int CurrentCPUs, int DesiredCPUs)> _CurrentCpuCount;
public (int CurrentCPUs, int DesiredCPUs) CurrentCpuCount => _CurrentCpuCount.Value;
// Command properties
public IReactiveCommand ShowReportCommand { get; }
public IReactiveCommand OpenReadmeCommand { get; }
@ -373,6 +376,11 @@ namespace Wabbajack
{
Installer.AfterInstallNavigation();
});
_CurrentCpuCount = this.WhenAny(x => x.Installer.ActiveInstallation.Queue.CurrentCpuCount)
.Switch()
.ObserveOnGuiThread()
.ToProperty(this, nameof(CurrentCpuCount));
}
private void ShowReport()

View File

@ -12,5 +12,6 @@ namespace Wabbajack
{
ObservableCollectionExtended<CPUDisplayVM> StatusList { get; }
MainWindowVM MWVM { get; }
(int CurrentCPUs, int DesiredCPUs) CurrentCpuCount { get; }
}
}

View File

@ -61,6 +61,9 @@
<Grid.ColumnDefinitions>
<ColumnDefinition Width="Auto" />
<ColumnDefinition Width="*" />
<ColumnDefinition Width="Auto" />
<ColumnDefinition Width="Auto" />
<ColumnDefinition Width="Auto" />
</Grid.ColumnDefinitions>
<TextBlock
Margin="5,0"
@ -69,10 +72,19 @@
<Slider
x:Name="TargetPercentageSlider"
Grid.Column="1"
Margin="2,0,6,0"
Margin="2,0,4,0"
Maximum="1"
Minimum="0.1"
Orientation="Horizontal" />
<TextBlock
x:Name="PercentageText"
Grid.Column="2"
Margin="2,0,4,0" />
<TextBlock Grid.Column="3" Text="|" />
<TextBlock
x:Name="CpuCountText"
Grid.Column="4"
Margin="2,0,6,0" />
</Grid>
</Border>
</Grid>

View File

@ -66,6 +66,18 @@ namespace Wabbajack
this.BindStrict(this.ViewModel, x => x.MWVM.Settings.Performance.TargetUsage, x => x.TargetPercentageSlider.Value)
.DisposeWith(disposable);
this.OneWayBindStrict(this.ViewModel, x => x.MWVM.Settings.Performance.TargetUsage, x => x.PercentageText.Text, x => $"{x.ToString("f2")}%")
.DisposeWith(disposable);
this.WhenAny(x => x.ViewModel.CurrentCpuCount)
.DistinctUntilChanged()
.ObserveOnGuiThread()
.Subscribe(x =>
{
this.CpuCountText.Text = $"{x.CurrentCPUs} / {x.DesiredCPUs}";
})
.DisposeWith(disposable);
});
}
}